From 52b1688757dbec9508ab88f33bcd377d56814fcf Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Thu, 7 Dec 2023 07:49:24 -0800 Subject: [PATCH 1/3] Consolidate attribute code into single package --- ...pkg-stanza-reader-attributes-refactor.yaml | 27 ++++ pkg/stanza/attrs/attrs.go | 21 ++- pkg/stanza/fileconsumer/attrs/attrs.go | 60 ++++++++ pkg/stanza/fileconsumer/attrs/attrs_test.go | 73 ++++++++++ pkg/stanza/fileconsumer/config.go | 63 ++++----- pkg/stanza/fileconsumer/file_test.go | 2 +- .../internal/reader/attributes_test.go | 133 ------------------ .../fileconsumer/internal/reader/factory.go | 74 +++------- .../internal/reader/factory_test.go | 69 +++------ receiver/otlpjsonfilereceiver/file_test.go | 25 ++-- 10 files changed, 264 insertions(+), 283 deletions(-) create mode 100755 .chloggen/pkg-stanza-reader-attributes-refactor.yaml create mode 100644 pkg/stanza/fileconsumer/attrs/attrs.go create mode 100644 pkg/stanza/fileconsumer/attrs/attrs_test.go delete mode 100644 pkg/stanza/fileconsumer/internal/reader/attributes_test.go diff --git a/.chloggen/pkg-stanza-reader-attributes-refactor.yaml b/.chloggen/pkg-stanza-reader-attributes-refactor.yaml new file mode 100755 index 000000000000..0b05e3f80558 --- /dev/null +++ b/.chloggen/pkg-stanza-reader-attributes-refactor.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate pkg/stanza/attrs package in favor of pkg/stanza/fileconsumer/attrs + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [30449] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/pkg/stanza/attrs/attrs.go b/pkg/stanza/attrs/attrs.go index 8ef87327ac84..485fe0850daf 100644 --- a/pkg/stanza/attrs/attrs.go +++ b/pkg/stanza/attrs/attrs.go @@ -3,9 +3,22 @@ package attrs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" +import fca "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" + const ( - LogFileName = "log.file.name" - LogFilePath = "log.file.path" - LogFileNameResolved = "log.file.name_resolved" - LogFilePathResolved = "log.file.path_resolved" + // Deprecated: [v0.93.0] Use pkg/stanza/fileconsumer/attrs.LogFileName instead. + // Will be removed in v0.94.0. + LogFileName = fca.LogFileName + + // Deprecated: [v0.92.0] Use pkg/stanza/fileconsumer/attrs.LogFilePath instead. + // Will be removed in v0.94.0. + LogFilePath = fca.LogFilePath + + // Deprecated: [v0.92.0] Use pkg/stanza/fileconsumer/attrs.LogFileNameResolved instead. + // Will be removed in v0.94.0. + LogFileNameResolved = fca.LogFileNameResolved + + // Deprecated: [v0.92.0] Use pkg/stanza/fileconsumer/attrs.LogFilePathResolved instead. + // Will be removed in v0.94.0. + LogFilePathResolved = fca.LogFilePathResolved ) diff --git a/pkg/stanza/fileconsumer/attrs/attrs.go b/pkg/stanza/fileconsumer/attrs/attrs.go new file mode 100644 index 000000000000..f8b4a1c47f7a --- /dev/null +++ b/pkg/stanza/fileconsumer/attrs/attrs.go @@ -0,0 +1,60 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attrs // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" + +import ( + "fmt" + "path/filepath" + "runtime" +) + +const ( + LogFileName = "log.file.name" + LogFilePath = "log.file.path" + LogFileNameResolved = "log.file.name_resolved" + LogFilePathResolved = "log.file.path_resolved" +) + +type Resolver struct { + IncludeFileName bool `mapstructure:"include_file_name,omitempty"` + IncludeFilePath bool `mapstructure:"include_file_path,omitempty"` + IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"` + IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"` +} + +func (r *Resolver) Resolve(path string) (attributes map[string]any, err error) { + // size 2 is sufficient if not resolving symlinks. This optimizes for the most performant cases. + attributes = make(map[string]any, 2) + if r.IncludeFileName { + attributes[LogFileName] = filepath.Base(path) + } + if r.IncludeFilePath { + attributes[LogFilePath] = path + } + if !r.IncludeFileNameResolved && !r.IncludeFilePathResolved { + return attributes, nil + } + + resolved := path + // Dirty solution, waiting for this permanent fix https://github.com/golang/go/issues/39786 + // EvalSymlinks on windows is partially working depending on the way you use Symlinks and Junctions + if runtime.GOOS != "windows" { + resolved, err = filepath.EvalSymlinks(path) + if err != nil { + return nil, fmt.Errorf("resolve symlinks: %w", err) + } + } + abs, err := filepath.Abs(resolved) + if err != nil { + return nil, fmt.Errorf("resolve abs: %w", err) + } + + if r.IncludeFileNameResolved { + attributes[LogFileNameResolved] = filepath.Base(abs) + } + if r.IncludeFilePathResolved { + attributes[LogFilePathResolved] = abs + } + return attributes, nil +} diff --git a/pkg/stanza/fileconsumer/attrs/attrs_test.go b/pkg/stanza/fileconsumer/attrs/attrs_test.go new file mode 100644 index 000000000000..e6c0b306f256 --- /dev/null +++ b/pkg/stanza/fileconsumer/attrs/attrs_test.go @@ -0,0 +1,73 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package attrs + +import ( + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" +) + +func TestResolver(t *testing.T) { + t.Parallel() + + for i := 0; i < 16; i++ { + + // Create a 4 bit string where each bit represents the value of a config option + bitString := fmt.Sprintf("%04b", i) + + // Create a resolver with a config that matches the bit pattern of i + r := Resolver{ + IncludeFileName: bitString[0] == '1', + IncludeFilePath: bitString[1] == '1', + IncludeFileNameResolved: bitString[2] == '1', + IncludeFilePathResolved: bitString[3] == '1', + } + + t.Run(bitString, func(t *testing.T) { + // Create a file + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + + attributes, err := r.Resolve(temp.Name()) + assert.NoError(t, err) + + var expectLen int + if r.IncludeFileName { + expectLen++ + assert.Equal(t, filepath.Base(temp.Name()), attributes[LogFileName]) + } else { + assert.Empty(t, attributes[LogFileName]) + } + if r.IncludeFilePath { + expectLen++ + assert.Equal(t, temp.Name(), attributes[LogFilePath]) + } else { + assert.Empty(t, attributes[LogFilePath]) + } + + // We don't have an independent way to resolve the path, so the only meangingful validate + // is to ensure that the resolver returns nothing vs something based on the config. + if r.IncludeFileNameResolved { + expectLen++ + assert.NotNil(t, attributes[LogFileNameResolved]) + assert.IsType(t, "", attributes[LogFileNameResolved]) + } else { + assert.Empty(t, attributes[LogFileNameResolved]) + } + if r.IncludeFilePathResolved { + expectLen++ + assert.NotNil(t, attributes[LogFilePathResolved]) + assert.IsType(t, "", attributes[LogFilePathResolved]) + } else { + assert.Empty(t, attributes[LogFilePathResolved]) + } + assert.Equal(t, expectLen, len(attributes)) + }) + } +} diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 20313ee33b12..e82719f698ff 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -14,6 +14,7 @@ import ( "golang.org/x/text/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" @@ -55,29 +56,28 @@ func NewConfig() *Config { MaxLogSize: reader.DefaultMaxLogSize, Encoding: defaultEncoding, FlushPeriod: reader.DefaultFlushPeriod, - IncludeFileName: true, + Resolver: attrs.Resolver{ + IncludeFileName: true, + }, } } // Config is the configuration of a file input operator type Config struct { - matcher.Criteria `mapstructure:",squash"` - PollInterval time.Duration `mapstructure:"poll_interval,omitempty"` - MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` - MaxBatches int `mapstructure:"max_batches,omitempty"` - StartAt string `mapstructure:"start_at,omitempty"` - FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` - MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` - Encoding string `mapstructure:"encoding,omitempty"` - SplitConfig split.Config `mapstructure:"multiline,omitempty"` - TrimConfig trim.Config `mapstructure:",squash,omitempty"` - FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"` - IncludeFileName bool `mapstructure:"include_file_name,omitempty"` - IncludeFilePath bool `mapstructure:"include_file_path,omitempty"` - IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty"` - IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty"` - Header *HeaderConfig `mapstructure:"header,omitempty"` - DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` + matcher.Criteria `mapstructure:",squash"` + attrs.Resolver `mapstructure:",squash"` + PollInterval time.Duration `mapstructure:"poll_interval,omitempty"` + MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty"` + MaxBatches int `mapstructure:"max_batches,omitempty"` + StartAt string `mapstructure:"start_at,omitempty"` + FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty"` + MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty"` + Encoding string `mapstructure:"encoding,omitempty"` + SplitConfig split.Config `mapstructure:"multiline,omitempty"` + TrimConfig trim.Config `mapstructure:",squash,omitempty"` + FlushPeriod time.Duration `mapstructure:"force_flush_period,omitempty"` + Header *HeaderConfig `mapstructure:"header,omitempty"` + DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"` } type HeaderConfig struct { @@ -150,21 +150,18 @@ func (c Config) buildManager(logger *zap.SugaredLogger, emit emit.Callback, spli } readerFactory := reader.Factory{ - SugaredLogger: logger.With("component", "fileconsumer"), - FromBeginning: startAtBeginning, - FingerprintSize: int(c.FingerprintSize), - MaxLogSize: int(c.MaxLogSize), - Encoding: enc, - SplitFunc: splitFunc, - TrimFunc: trimFunc, - FlushTimeout: c.FlushPeriod, - EmitFunc: emit, - IncludeFileName: c.IncludeFileName, - IncludeFilePath: c.IncludeFilePath, - IncludeFileNameResolved: c.IncludeFileNameResolved, - IncludeFilePathResolved: c.IncludeFilePathResolved, - HeaderConfig: hCfg, - DeleteAtEOF: c.DeleteAfterRead, + SugaredLogger: logger.With("component", "fileconsumer"), + FromBeginning: startAtBeginning, + FingerprintSize: int(c.FingerprintSize), + MaxLogSize: int(c.MaxLogSize), + Encoding: enc, + SplitFunc: splitFunc, + TrimFunc: trimFunc, + FlushTimeout: c.FlushPeriod, + EmitFunc: emit, + Attributes: c.Resolver, + HeaderConfig: hCfg, + DeleteAtEOF: c.DeleteAfterRead, } return &Manager{ diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index 5ec805147e43..2e203ef1ca4d 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -18,7 +18,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/featuregate" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader" diff --git a/pkg/stanza/fileconsumer/internal/reader/attributes_test.go b/pkg/stanza/fileconsumer/internal/reader/attributes_test.go deleted file mode 100644 index 1b8bec2569d9..000000000000 --- a/pkg/stanza/fileconsumer/internal/reader/attributes_test.go +++ /dev/null @@ -1,133 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package reader - -import ( - "context" - "os" - "path/filepath" - "runtime" - "testing" - - "github.com/stretchr/testify/require" - - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" -) - -// AddFields tests that the `log.file.name` and `log.file.path` fields are included -// when IncludeFileName and IncludeFilePath are set to true -func TestAttributes(t *testing.T) { - t.Parallel() - - // Create a file, then start - tempDir := t.TempDir() - temp := filetest.OpenTemp(t, tempDir) - filetest.WriteString(t, temp, "testlog\n") - - f, sink := testFactory(t, includeFileName(), includeFilePath()) - fp, err := f.NewFingerprint(temp) - require.NoError(t, err) - - reader, err := f.NewReader(temp, fp) - require.NoError(t, err) - defer reader.Close() - - reader.ReadToEnd(context.Background()) - - token, attributes := sink.NextCall(t) - require.Equal(t, []byte("testlog"), token) - require.Equal(t, filepath.Base(temp.Name()), attributes[attrs.LogFileName]) - require.Equal(t, temp.Name(), attributes[attrs.LogFilePath]) - require.Nil(t, attributes[attrs.LogFileNameResolved]) - require.Nil(t, attributes[attrs.LogFilePathResolved]) -} - -// TestAttributesResolved tests that the `log.file.name_resolved` and `log.file.path_resolved` fields are included -// when IncludeFileNameResolved and IncludeFilePathResolved are set to true -func TestAttributesResolved(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("Windows symlinks usage disabled for now. See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21088") - } - t.Parallel() - - // Set up actual file - actualDir := t.TempDir() - actualFile, err := os.CreateTemp(actualDir, "") - require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, actualFile.Close()) }) - - // Resolve path - realPath, err := filepath.EvalSymlinks(actualFile.Name()) - require.NoError(t, err) - resolved, err := filepath.Abs(realPath) - require.NoError(t, err) - - // Create another directory with a symbolic link to the file - symlinkDir := t.TempDir() - symlinkPath := filepath.Join(symlinkDir, "symlink") - require.NoError(t, os.Symlink(actualFile.Name(), symlinkPath)) - symlinkFile := filetest.OpenFile(t, symlinkPath) - - // Populate data - filetest.WriteString(t, actualFile, "testlog\n") - - // Read the data - f, sink := testFactory(t, includeFileName(), includeFilePath(), includeFileNameResolved(), includeFilePathResolved()) - fp, err := f.NewFingerprint(symlinkFile) - require.NoError(t, err) - reader, err := f.NewReader(symlinkFile, fp) - require.NoError(t, err) - defer reader.Close() - reader.ReadToEnd(context.Background()) - - // Validate expectations - token, attributes := sink.NextCall(t) - require.Equal(t, []byte("testlog"), token) - require.Equal(t, filepath.Base(symlinkPath), attributes[attrs.LogFileName]) - require.Equal(t, symlinkPath, attributes[attrs.LogFilePath]) - require.Equal(t, filepath.Base(resolved), attributes[attrs.LogFileNameResolved]) - require.Equal(t, resolved, attributes[attrs.LogFilePathResolved]) - - // Move symlinked file - newActualPath := actualFile.Name() + "_renamed" - require.NoError(t, os.Remove(symlinkPath)) - require.NoError(t, os.Rename(actualFile.Name(), newActualPath)) - require.NoError(t, os.Symlink(newActualPath, symlinkPath)) - - // Append additional data - filetest.WriteString(t, actualFile, "testlog2\n") - - // Recreate the reader - symlinkFile = filetest.OpenFile(t, symlinkPath) - reader, err = f.NewReaderFromMetadata(symlinkFile, reader.Close()) - require.NoError(t, err) - reader.ReadToEnd(context.Background()) - - token, attributes = sink.NextCall(t) - require.Equal(t, []byte("testlog2"), token) - require.Equal(t, filepath.Base(symlinkPath), attributes[attrs.LogFileName]) - require.Equal(t, symlinkPath, attributes[attrs.LogFilePath]) - require.Equal(t, filepath.Base(resolved)+"_renamed", attributes[attrs.LogFileNameResolved]) - require.Equal(t, resolved+"_renamed", attributes[attrs.LogFilePathResolved]) - - // Append additional data - filetest.WriteString(t, actualFile, "testlog3\n") - - // Recreate the factory with the attributes disabled - f, sink = testFactory(t) - - // Recreate the reader and read new data - symlinkFile = filetest.OpenFile(t, symlinkPath) - reader, err = f.NewReaderFromMetadata(symlinkFile, reader.Close()) - require.NoError(t, err) - reader.ReadToEnd(context.Background()) - - token, attributes = sink.NextCall(t) - require.Equal(t, []byte("testlog3"), token) - require.Nil(t, attributes[attrs.LogFileName]) - require.Nil(t, attributes[attrs.LogFilePath]) - require.Nil(t, attributes[attrs.LogFileNameResolved]) - require.Nil(t, attributes[attrs.LogFilePathResolved]) -} diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index 167acf98e63c..c962f7d4b21f 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -6,15 +6,13 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "bufio" "os" - "path/filepath" - "runtime" "time" "go.uber.org/zap" "golang.org/x/text/encoding" - "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/emit" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/header" @@ -29,20 +27,17 @@ const ( type Factory struct { *zap.SugaredLogger - HeaderConfig *header.Config - FromBeginning bool - FingerprintSize int - MaxLogSize int - Encoding encoding.Encoding - SplitFunc bufio.SplitFunc - TrimFunc trim.Func - FlushTimeout time.Duration - EmitFunc emit.Callback - IncludeFileName bool - IncludeFilePath bool - IncludeFileNameResolved bool - IncludeFilePathResolved bool - DeleteAtEOF bool + HeaderConfig *header.Config + FromBeginning bool + FingerprintSize int + MaxLogSize int + Encoding encoding.Encoding + SplitFunc bufio.SplitFunc + TrimFunc trim.Func + FlushTimeout time.Duration + EmitFunc emit.Callback + Attributes attrs.Resolver + DeleteAtEOF bool } func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) { @@ -50,7 +45,11 @@ func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error } func (f *Factory) NewReader(file *os.File, fp *fingerprint.Fingerprint) (*Reader, error) { - m := &Metadata{Fingerprint: fp, FileAttributes: map[string]any{}} + attributes, err := f.Attributes.Resolve(file.Name()) + if err != nil { + return nil, err + } + m := &Metadata{Fingerprint: fp, FileAttributes: attributes} if f.FlushTimeout > 0 { m.FlushState = &flush.State{LastDataChange: time.Now()} } @@ -92,42 +91,13 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, r.processFunc = r.headerReader.Process } - // Resolve file name and path attributes - resolved := r.fileName - - // Dirty solution, waiting for this permanent fix https://github.com/golang/go/issues/39786 - // EvalSymlinks on windows is partially working depending on the way you use Symlinks and Junctions - if runtime.GOOS != "windows" { - resolved, err = filepath.EvalSymlinks(r.fileName) - if err != nil { - f.Errorf("resolve symlinks: %w", err) - } - } - abs, err := filepath.Abs(resolved) + attributes, err := f.Attributes.Resolve(file.Name()) if err != nil { - f.Errorf("resolve abs: %w", err) + return nil, err } - - if f.IncludeFileName { - r.FileAttributes[attrs.LogFileName] = filepath.Base(r.fileName) - } else if r.FileAttributes[attrs.LogFileName] != nil { - delete(r.FileAttributes, attrs.LogFileName) - } - if f.IncludeFilePath { - r.FileAttributes[attrs.LogFilePath] = r.fileName - } else if r.FileAttributes[attrs.LogFilePath] != nil { - delete(r.FileAttributes, attrs.LogFilePath) - } - if f.IncludeFileNameResolved { - r.FileAttributes[attrs.LogFileNameResolved] = filepath.Base(abs) - } else if r.FileAttributes[attrs.LogFileNameResolved] != nil { - delete(r.FileAttributes, attrs.LogFileNameResolved) + // Copy attributes into existing map to avoid overwriting header attributes + for k, v := range attributes { + r.FileAttributes[k] = v } - if f.IncludeFilePathResolved { - r.FileAttributes[attrs.LogFilePathResolved] = abs - } else if r.FileAttributes[attrs.LogFilePathResolved] != nil { - delete(r.FileAttributes, attrs.LogFilePathResolved) - } - return r, nil } diff --git a/pkg/stanza/fileconsumer/internal/reader/factory_test.go b/pkg/stanza/fileconsumer/internal/reader/factory_test.go index 4b87c4bb7c44..39f5d8403d49 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory_test.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory_test.go @@ -11,6 +11,7 @@ import ( "golang.org/x/text/encoding" "golang.org/x/text/encoding/unicode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/emittest" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/fingerprint" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" @@ -42,37 +43,31 @@ func testFactory(t *testing.T, opts ...testFactoryOpt) (*Factory, *emittest.Sink sink := emittest.NewSink(emittest.WithCallBuffer(cfg.sinkCallBufferSize)) return &Factory{ - SugaredLogger: testutil.Logger(t), - FromBeginning: cfg.fromBeginning, - FingerprintSize: cfg.fingerprintSize, - MaxLogSize: cfg.maxLogSize, - Encoding: cfg.encoding, - SplitFunc: splitFunc, - TrimFunc: cfg.trimFunc, - FlushTimeout: cfg.flushPeriod, - EmitFunc: sink.Callback, - IncludeFileName: cfg.includeFileName, - IncludeFilePath: cfg.includeFilePath, - IncludeFileNameResolved: cfg.includeFileNameResolved, - IncludeFilePathResolved: cfg.includeFilePathResolved, + SugaredLogger: testutil.Logger(t), + FromBeginning: cfg.fromBeginning, + FingerprintSize: cfg.fingerprintSize, + MaxLogSize: cfg.maxLogSize, + Encoding: cfg.encoding, + SplitFunc: splitFunc, + TrimFunc: cfg.trimFunc, + FlushTimeout: cfg.flushPeriod, + EmitFunc: sink.Callback, + Attributes: cfg.attributes, }, sink } type testFactoryOpt func(*testFactoryCfg) type testFactoryCfg struct { - fromBeginning bool - fingerprintSize int - maxLogSize int - encoding encoding.Encoding - splitCfg split.Config - trimFunc trim.Func - flushPeriod time.Duration - sinkCallBufferSize int - includeFileName bool - includeFilePath bool - includeFileNameResolved bool - includeFilePathResolved bool + fromBeginning bool + fingerprintSize int + maxLogSize int + encoding encoding.Encoding + splitCfg split.Config + trimFunc trim.Func + flushPeriod time.Duration + sinkCallBufferSize int + attributes attrs.Resolver } func withFingerprintSize(size int) testFactoryOpt { @@ -104,27 +99,3 @@ func withSinkBufferSize(n int) testFactoryOpt { c.sinkCallBufferSize = n } } - -func includeFileName() testFactoryOpt { - return func(c *testFactoryCfg) { - c.includeFileName = true - } -} - -func includeFilePath() testFactoryOpt { - return func(c *testFactoryCfg) { - c.includeFilePath = true - } -} - -func includeFileNameResolved() testFactoryOpt { - return func(c *testFactoryCfg) { - c.includeFileNameResolved = true - } -} - -func includeFilePathResolved() testFactoryOpt { - return func(c *testFactoryCfg) { - c.includeFilePathResolved = true - } -} diff --git a/receiver/otlpjsonfilereceiver/file_test.go b/receiver/otlpjsonfilereceiver/file_test.go index 78ae69aa2018..731120679352 100644 --- a/receiver/otlpjsonfilereceiver/file_test.go +++ b/receiver/otlpjsonfilereceiver/file_test.go @@ -23,6 +23,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/attrs" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/matcher" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otlpjsonfilereceiver/internal/metadata" ) @@ -118,17 +119,19 @@ func TestFileLogsReceiver(t *testing.T) { func testdataConfigYamlAsMap() *Config { return &Config{ Config: fileconsumer.Config{ - IncludeFileName: true, - IncludeFilePath: false, - IncludeFileNameResolved: false, - IncludeFilePathResolved: false, - PollInterval: 200 * time.Millisecond, - Encoding: "utf-8", - StartAt: "end", - FingerprintSize: 1000, - MaxLogSize: 1024 * 1024, - MaxConcurrentFiles: 1024, - FlushPeriod: 500 * time.Millisecond, + Resolver: attrs.Resolver{ + IncludeFileName: true, + IncludeFilePath: false, + IncludeFileNameResolved: false, + IncludeFilePathResolved: false, + }, + PollInterval: 200 * time.Millisecond, + Encoding: "utf-8", + StartAt: "end", + FingerprintSize: 1000, + MaxLogSize: 1024 * 1024, + MaxConcurrentFiles: 1024, + FlushPeriod: 500 * time.Millisecond, Criteria: matcher.Criteria{ Include: []string{"/var/log/*.log"}, Exclude: []string{"/var/log/example.log"}, From 45c7d2da40c033a89cbfd68ece9f2182cf980108 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 16 Jan 2024 14:39:57 -0600 Subject: [PATCH 2/3] [chore][pkg/stanza] Add tests for reader.Validate --- .../internal/reader/validate_test.go | 142 ++++++++++++++++++ 1 file changed, 142 insertions(+) create mode 100644 pkg/stanza/fileconsumer/internal/reader/validate_test.go diff --git a/pkg/stanza/fileconsumer/internal/reader/validate_test.go b/pkg/stanza/fileconsumer/internal/reader/validate_test.go new file mode 100644 index 000000000000..2761e6b7fa7c --- /dev/null +++ b/pkg/stanza/fileconsumer/internal/reader/validate_test.go @@ -0,0 +1,142 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package reader + +import ( + "context" + "os" + "runtime" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/filetest" +) + +// When a file it moved, we should detect that our old handle is still valid. +func TestValidateMoved(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("Moving files while open is unsupported on Windows") + } + t.Parallel() + + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + _, err := temp.WriteString("testlog1\n") + require.NoError(t, err) + + f, sink := testFactory(t) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + + reader, err := f.NewReader(temp, fp) + require.NoError(t, err) + + reader.ReadToEnd(context.Background()) + sink.ExpectToken(t, []byte("testlog1")) + + // Validate before moving + assert.True(t, reader.Validate()) + + // Move the file + require.NoError(t, os.Rename(temp.Name(), temp.Name()+".old")) + + // Validate after moving + assert.True(t, reader.Validate()) + + _, err = temp.WriteString("testlog2\n") + require.NoError(t, err) + + reader.ReadToEnd(context.Background()) + sink.ExpectToken(t, []byte("testlog2")) + + // Validate after writing to the moved file + assert.True(t, reader.Validate()) +} + +func TestInvalidateTruncated(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + _, err := temp.WriteString("testlog1\n") + require.NoError(t, err) + + f, sink := testFactory(t) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + + reader, err := f.NewReader(temp, fp) + require.NoError(t, err) + + reader.ReadToEnd(context.Background()) + sink.ExpectToken(t, []byte("testlog1")) + + // Validate before truncating + assert.True(t, reader.Validate()) + + // Truncate the file + require.NoError(t, temp.Truncate(0)) + + // Invalidate after truncating + assert.False(t, reader.Validate()) + + // Write different content to the file + _, err = temp.WriteString("testlog2\n") + require.NoError(t, err) + + // Still invalid + assert.False(t, reader.Validate()) +} + +func TestInvalidateClosed(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + _, err := temp.WriteString("testlog1\n") + require.NoError(t, err) + + f, _ := testFactory(t) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + + reader, err := f.NewReader(temp, fp) + require.NoError(t, err) + + // Validate before closing + assert.True(t, reader.Validate()) + + // Close the file using the reader to drop the handle. + reader.Close() + + // Invalidate after closing + assert.False(t, reader.Validate()) +} + +func TestInvalidateUnreadable(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + temp := filetest.OpenTemp(t, tempDir) + _, err := temp.WriteString("testlog1\n") + require.NoError(t, err) + + f, _ := testFactory(t) + fp, err := f.NewFingerprint(temp) + require.NoError(t, err) + + reader, err := f.NewReader(temp, fp) + require.NoError(t, err) + + // Validate before closing + assert.True(t, reader.Validate()) + + // Close the file using our direct handle. The reader still has a handle but cannot be read. + require.NoError(t, temp.Close()) + + // Invalidate unreadable file + assert.False(t, reader.Validate()) +} From 20ba36ebc2c0192c4a4427bd9b83216f05856fb8 Mon Sep 17 00:00:00 2001 From: Dan Jaglowski Date: Tue, 16 Jan 2024 17:42:58 -0600 Subject: [PATCH 3/3] [chore][pkg/stanza] Removes a couple unnecessary functions --- .../fileconsumer/internal/reader/factory.go | 7 +++++-- .../fileconsumer/internal/reader/reader.go | 18 ------------------ 2 files changed, 5 insertions(+), 20 deletions(-) diff --git a/pkg/stanza/fileconsumer/internal/reader/factory.go b/pkg/stanza/fileconsumer/internal/reader/factory.go index c962f7d4b21f..a7934f4a9cbf 100644 --- a/pkg/stanza/fileconsumer/internal/reader/factory.go +++ b/pkg/stanza/fileconsumer/internal/reader/factory.go @@ -5,6 +5,7 @@ package reader // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "bufio" + "fmt" "os" "time" @@ -73,9 +74,11 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader, r.lineSplitFunc = trim.WithFunc(trim.ToLength(flushFunc, f.MaxLogSize), f.TrimFunc) if !f.FromBeginning { - if err = r.offsetToEnd(); err != nil { - return nil, err + var info os.FileInfo + if info, err = r.file.Stat(); err != nil { + return nil, fmt.Errorf("stat: %w", err) } + r.Offset = info.Size() } r.emitFunc = f.EmitFunc diff --git a/pkg/stanza/fileconsumer/internal/reader/reader.go b/pkg/stanza/fileconsumer/internal/reader/reader.go index 7bd2311d8138..46ab06b222a0 100644 --- a/pkg/stanza/fileconsumer/internal/reader/reader.go +++ b/pkg/stanza/fileconsumer/internal/reader/reader.go @@ -7,7 +7,6 @@ import ( "bufio" "context" "errors" - "fmt" "os" "go.uber.org/zap" @@ -45,23 +44,6 @@ type Reader struct { deleteAtEOF bool } -// offsetToEnd sets the starting offset -func (r *Reader) offsetToEnd() error { - info, err := r.file.Stat() - if err != nil { - return fmt.Errorf("stat: %w", err) - } - r.Offset = info.Size() - return nil -} - -func (r *Reader) NewFingerprintFromFile() (*fingerprint.Fingerprint, error) { - if r.file == nil { - return nil, errors.New("file is nil") - } - return fingerprint.New(r.file, r.fingerprintSize) -} - // ReadToEnd will read until the end of the file func (r *Reader) ReadToEnd(ctx context.Context) { if _, err := r.file.Seek(r.Offset, 0); err != nil {