Skip to content
This repository has been archived by the owner on May 25, 2022. It is now read-only.

filelog: add file_name_resolved and file_path_resolved attributes #189

Merged
merged 6 commits into from
Jun 22, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,26 @@ The `file_input` operator reads logs from files. It will place the lines read in

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `file_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `include` | required | A list of file glob patterns that match the file paths to be read |
| `exclude` | [] | A list of file glob patterns to exclude from reading |
| `poll_interval` | 200ms | The duration between filesystem polls |
| `multiline` | | A `multiline` configuration block. See below for details |
| `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `include_file_name` | `true` | Whether to add the file name as the attribute `file_name` |
| `include_file_path` | `false` | Whether to add the file path as the label `file_path` |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `file_input` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `include` | required | A list of file glob patterns that match the file paths to be read |
| `exclude` | [] | A list of file glob patterns to exclude from reading |
| `poll_interval` | 200ms | The duration between filesystem polls |
| `multiline` | | A `multiline` configuration block. See below for details |
| `write_to` | `$body` | The body [field](/docs/types/field.md) written to when creating a new log entry |
| `encoding` | `utf-8` | The encoding of the file being read. See the list of supported encodings below for available options |
| `include_file_name` | `true` | Whether to add the file name as the attribute `file_name` |
| `include_file_path` | `false` | Whether to add the file path as the attribute `file_path` |
| `include_file_name_resolved` | `false` | Whether to add the file name after symlinks resolution as the attribute `file_name_resolved` |
| `include_file_path_resolved` | `false` | Whether to add the file path after symlinks resolution as the attribute `file_path_resolved` |
| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end` |
| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. Decreasing this value at any point will cause existing fingerprints to forgotten, meaning that all files will be read from the beginning (one time). |
| `max_log_size` | `1MiB` | The maximum size of a log entry to read before failing. Protects against reading large amounts of data into memory |
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. One batch will be processed per `poll_interval`. |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource |

Note that by default, no logs will be read unless the monitored file is actively being written to because `start_at` defaults to `end`.

Expand All @@ -36,7 +38,7 @@ The `multiline` configuration block must contain exactly one of `line_start_patt
match either the beginning of a new log entry, or the end of a log entry.

Also refer to [recombine](/docs/operators/recombine.md) operator for merging events with greater control.

### File rotation

When files are rotated and its new names are no longer captured in `include` pattern (i.e. tailing symlink files), it could result in data loss.
Expand Down
86 changes: 51 additions & 35 deletions operator/builtin/input/file/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ const (
// NewInputConfig creates a new input config with default values
func NewInputConfig(operatorID string) *InputConfig {
return &InputConfig{
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
IncludeFileName: true,
IncludeFilePath: false,
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: helper.NewEncodingConfig(),
InputConfig: helper.NewInputConfig(operatorID, "file_input"),
PollInterval: helper.Duration{Duration: 200 * time.Millisecond},
IncludeFileName: true,
IncludeFilePath: false,
IncludeFileNameResolved: false,
IncludeFilePathResolved: false,
StartAt: "end",
FingerprintSize: defaultFingerprintSize,
MaxLogSize: defaultMaxLogSize,
MaxConcurrentFiles: defaultMaxConcurrentFiles,
Encoding: helper.NewEncodingConfig(),
}
}

Expand All @@ -56,15 +58,17 @@ type InputConfig struct {
Include []string `mapstructure:"include,omitempty" json:"include,omitempty" yaml:"include,omitempty"`
Exclude []string `mapstructure:"exclude,omitempty" json:"exclude,omitempty" yaml:"exclude,omitempty"`

PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
PollInterval helper.Duration `mapstructure:"poll_interval,omitempty" json:"poll_interval,omitempty" yaml:"poll_interval,omitempty"`
Multiline helper.MultilineConfig `mapstructure:"multiline,omitempty" json:"multiline,omitempty" yaml:"multiline,omitempty"`
IncludeFileName bool `mapstructure:"include_file_name,omitempty" json:"include_file_name,omitempty" yaml:"include_file_name,omitempty"`
IncludeFilePath bool `mapstructure:"include_file_path,omitempty" json:"include_file_path,omitempty" yaml:"include_file_path,omitempty"`
IncludeFileNameResolved bool `mapstructure:"include_file_name_resolved,omitempty" json:"include_file_name_resolved,omitempty" yaml:"include_file_name_resolved,omitempty"`
IncludeFilePathResolved bool `mapstructure:"include_file_path_resolved,omitempty" json:"include_file_path_resolved,omitempty" yaml:"include_file_path_resolved,omitempty"`
StartAt string `mapstructure:"start_at,omitempty" json:"start_at,omitempty" yaml:"start_at,omitempty"`
FingerprintSize helper.ByteSize `mapstructure:"fingerprint_size,omitempty" json:"fingerprint_size,omitempty" yaml:"fingerprint_size,omitempty"`
MaxLogSize helper.ByteSize `mapstructure:"max_log_size,omitempty" json:"max_log_size,omitempty" yaml:"max_log_size,omitempty"`
MaxConcurrentFiles int `mapstructure:"max_concurrent_files,omitempty" json:"max_concurrent_files,omitempty" yaml:"max_concurrent_files,omitempty"`
Encoding helper.EncodingConfig `mapstructure:",squash,omitempty" json:",inline,omitempty" yaml:",inline,omitempty"`
}

// Build will build a file input operator from the supplied configuration
Expand Down Expand Up @@ -138,24 +142,36 @@ func (c InputConfig) Build(context operator.BuildContext) ([]operator.Operator,
filePathField = entry.NewAttributeField("file_path")
}

fileNameResolvedField := entry.NewNilField()
if c.IncludeFileNameResolved {
fileNameResolvedField = entry.NewAttributeField("file_name_resolved")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should look into standardizing file related attribute names as Otel semantic conventions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

@tigrannajaryan tigrannajaryan Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I think we need to decide on how we group this attributes, perhaps introduce a file.* namespace and put everything there. Perhaps also some of the attributes discussed here should in that namespace, e.g. stream?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be added to https://github.com/open-telemetry/opentelemetry-specification/tree/main/specification/resource/semantic_conventions?

Maybe it's just a question of where is the right place to add the conventions, but this particular location seems to imply that we are defining a file as a resource. I'm not opposed to this, but my understanding is that, in the context of the file_input operator / filelog receiver, we should not consider files to be resources. They are not the things that emitted the logs. They are essentially just a mechanism by which logs are transmitted.

So just to be clear, are we talking about defining file-related fields as a resource, but then just using the same convention to structure our attributes here? Or do we need to add a parallel section to the spec, which specifically defines attribute conventions for files?

Maybe this isn't an important nuance, but I want to make sure we're not missing it.

Copy link
Member

@djaglowski djaglowski Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass proposal for a new file.* namespace:

file.name
file.path
file.name.resolved
file.path.resolved
file.stream

If we can agree on a general structure here, then perhaps we can switch to that in this PR, and formalize the semantic convention asynchronously, and backport to this repo if changes are made there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right, these are not resource attributes, they are log record attributes.

}

filePathResolvedField := entry.NewNilField()
if c.IncludeFilePathResolved {
filePathResolvedField = entry.NewAttributeField("file_path_resolved")
}

op := &InputOperator{
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
PollInterval: c.PollInterval.Raw(),
FilePathField: filePathField,
FileNameField: fileNameField,
startAtBeginning: startAtBeginning,
queuedMatches: make([]string, 0),
encoding: encoding,
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
InputOperator: inputOperator,
Include: c.Include,
Exclude: c.Exclude,
SplitFunc: splitFunc,
PollInterval: c.PollInterval.Raw(),
FilePathField: filePathField,
FileNameField: fileNameField,
FilePathResolvedField: filePathResolvedField,
FileNameResolvedField: fileNameResolvedField,
startAtBeginning: startAtBeginning,
queuedMatches: make([]string, 0),
encoding: encoding,
firstCheck: true,
cancel: func() {},
knownFiles: make([]*Reader, 0, 10),
fingerprintSize: int(c.FingerprintSize),
MaxLogSize: int(c.MaxLogSize),
MaxConcurrentFiles: c.MaxConcurrentFiles,
SeenPaths: make(map[string]struct{}, 100),
}

return []operator.Operator{op}, nil
Expand Down
22 changes: 12 additions & 10 deletions operator/builtin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,17 @@ import (
type InputOperator struct {
helper.InputOperator

Include []string
Exclude []string
FilePathField entry.Field
FileNameField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}
Include []string
Exclude []string
FilePathField entry.Field
FileNameField entry.Field
FilePathResolvedField entry.Field
FileNameResolvedField entry.Field
PollInterval time.Duration
SplitFunc bufio.SplitFunc
MaxLogSize int
MaxConcurrentFiles int
SeenPaths map[string]struct{}

persister operator.Persister

Expand Down Expand Up @@ -316,7 +318,7 @@ func (f *InputOperator) newReader(file *os.File, fp *Fingerprint, firstCheck boo
if err != nil {
return nil, err
}
newReader.Path = file.Name()
newReader.fileAttributes = resolveFileAttributes(file.Name())
return newReader, nil
}

Expand Down
Loading