Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

filelog: add file_name_resolved and file_path_resolved attributes #189

Merged
merged 6 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ The `file_input` operator reads logs from files. It will place the lines read in

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `file_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `include` | required | A list of file glob patterns that match the file paths to be read |
| `exclude` | [] | A list of file glob patterns to exclude from reading |
| `poll_interval` | 200ms | The duration between filesystem polls |
| `multiline` | | A `multiline` configuration block. See below for details |
| `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `include_file_name` | `true` | Whether to add the file name as the attribute `file_name` |
| `include_file_path` | `false` | Whether to add the file path as the label `file_path` |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `file_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `include` | required | A list of file glob patterns that match the file paths to be read |
| `exclude` | [] | A list of file glob patterns to exclude from reading |
| `poll_interval` | 200ms | The duration between filesystem polls |
| `multiline` | | A `multiline` configuration block. See below for details |
| `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `include_file_name` | `true` | Whether to add the file name as the attribute `file_name` |
| `include_file_path` | `false` | Whether to add the file path as the attribute `file_path` |
| `include_file_name_resolved` | `false` | Whether to add the file name after symlinks resolution as the attribute `file_name_resolved` |
| `include_file_path_resolved` | `false` | Whether to add the file path after symlinks resolution as the attribute `file_path_resolved` |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |

Note that by default, no logs will be read unless the monitored file is actively being written to because `start_at` defaults to `end`.

Expand All @@ -36,7 +38,7 @@ The `multiline` configuration block must contain exactly one of `line_start_patt
match either the beginning of a new log entry, or the end of a log entry.

Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.

### File rotation

When files are rotated and its new names are no longer captured in `include` pattern (i.e. tailing symlink files), it could result in data loss.
Expand Down
90 changes: 53 additions & 37 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ const (
// NewInputConfig creates a new input config with default values
func NewInputConfig(operatorID string) *InputConfig {
return &InputConfig{
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
IncludeFileName: true,
IncludeFilePath: false,
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: helper.NewEncodingConfig(),
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
IncludeFileName: true,
IncludeFilePath: false,
IncludeFileNameResolved: false,
IncludeFilePathResolved: false,
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: helper.NewEncodingConfig(),
}
}

Expand All @@ -56,15 +58,17 @@ type InputConfig struct {
Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -130,32 +134,44 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,

fileNameField := entry.NewNilField()
if c.IncludeFileName {
fileNameField = entry.NewAttributeField("file_name")
fileNameField = entry.NewAttributeField("file.name")
}

filePathField := entry.NewNilField()
if c.IncludeFilePath {
filePathField = entry.NewAttributeField("file_path")
filePathField = entry.NewAttributeField("file.path")
}

fileNameResolvedField := entry.NewNilField()
if c.IncludeFileNameResolved {
fileNameResolvedField = entry.NewAttributeField("file.name.resolved")
}

filePathResolvedField := entry.NewNilField()
if c.IncludeFilePathResolved {
filePathResolvedField = entry.NewAttributeField("file.path.resolved")
}

op := &InputOperator{
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
PollInterval: c.PollInterval.Raw(),
FilePathField: filePathField,
FileNameField: fileNameField,
startAtBeginning: startAtBeginning,
queuedMatches: make([]string, 0),
encoding: encoding,
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
PollInterval: c.PollInterval.Raw(),
FilePathField: filePathField,
FileNameField: fileNameField,
FilePathResolvedField: filePathResolvedField,
FileNameResolvedField: fileNameResolvedField,
startAtBeginning: startAtBeginning,
queuedMatches: make([]string, 0),
encoding: encoding,
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
}

return []operator.Operator{op}, nil
Expand Down
2 changes: 1 addition & 1 deletion operator/builtin/input/file/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ func TestBuild(t *testing.T) {
require.Equal(t, f.OutputOperators[0], fakeOutput)
require.Equal(t, f.Include, []string{"/var/log/testpath.*"})
require.Equal(t, f.FilePathField, entry.NewNilField())
require.Equal(t, f.FileNameField, entry.NewAttributeField("file_name"))
require.Equal(t, f.FileNameField, entry.NewAttributeField("file.name"))
require.Equal(t, f.PollInterval, 10*time.Millisecond)
},
},
Expand Down
22 changes: 12 additions & 10 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ import (
type InputOperator struct {
helper.InputOperator

Include []string
Exclude []string
FilePathField entry.Field
FileNameField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}
Include []string
Exclude []string
FilePathField entry.Field
FileNameField entry.Field
FilePathResolvedField entry.Field
FileNameResolvedField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}

persister operator.Persister

Expand Down Expand Up @@ -316,7 +318,7 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
if err != nil {
return nil, err
}
newReader.Path = file.Name()
newReader.fileAttributes = f.resolveFileAttributes(file.Name())
return newReader, nil
}

Expand Down
Loading