diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6f219db6f7f..f50dcd80647 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -462,6 +462,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Add experimental dataset sonicwall/firewall for Sonicwall Firewalls logs {pull}19713[19713] - Add experimental dataset squid/log for Squid Proxy Server logs {pull}19713[19713] - Add experimental dataset zscaler/zia for Zscaler Internet Access logs {pull}19713[19713] +- Add initial support for configurable file identity tracking. {pull}18748[18748] *Heartbeat* diff --git a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl index ae4816f4f82..c920b7dbec8 100644 --- a/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl +++ b/filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl @@ -50,6 +50,10 @@ filebeat.inputs: # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: ['.gz$'] + # Method to determine if two files are the same or not. By default + # the Beat considers two files the same if their inode and device id are the same. + #file_identity.native: ~ + # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering #fields: diff --git a/filebeat/docs/inputs/input-common-file-options.asciidoc b/filebeat/docs/inputs/input-common-file-options.asciidoc index 1947e7cdda7..9bd93e97efc 100644 --- a/filebeat/docs/inputs/input-common-file-options.asciidoc +++ b/filebeat/docs/inputs/input-common-file-options.asciidoc @@ -129,6 +129,10 @@ file is renamed or moved in such a way that it's no longer matched by the file patterns specified for the , the file will not be picked up again. {beatname_uc} will not finish reading the file. +Do not use this option when `path` based `file_identity` is configured. It does +not make sense to enable the option, as Filebeat cannot detect renames using +path names as unique identifiers. + WINDOWS: If your Windows log rotation system shows errors because it can't rotate the files, you should enable this option. @@ -397,3 +401,44 @@ file that hasn't been harvested for a longer period of time. This configuration option applies per input. You can use this option to indirectly set higher priorities on certain inputs by assigning a higher limit of harvesters. + +[float] +===== `file_identity` + +Different `file_identity` methods can be configured to suit the +environment where you are collecting log messages. + + +*`native`*:: The default behaviour of {beatname_uc} is to differentiate +between files using their inodes and device ids. + +[source,yaml] +---- +file_identity.native: ~ +---- + +*`path`*:: To identify files based on their paths use this strategy. + +WARNING: Only use this strategy if your log files are rotated to a folder +outside of the scope of your input or not at all. Otherwise you end up +with duplicated events. + +WARNING: This strategy does not support renaming files. +If an input file is renamed, {beatname_uc} will read it again if the new path +matches the settings of the input. + +[source,yaml] +---- +file_identity.path: ~ +---- + +*`inode_marker`*:: If the device id changes from time to time, you must use +this method to distinguish files. This option is not supported on Windows. + +Set the location of the marker file the following way: + +[source,yaml] +---- +file_identity.inode_marker.path: /logs/.filebeat-marker +---- + diff --git a/filebeat/docs/inputs/input-log.asciidoc b/filebeat/docs/inputs/input-log.asciidoc index 95670734b02..0a69a9b65c2 100644 --- a/filebeat/docs/inputs/input-log.asciidoc +++ b/filebeat/docs/inputs/input-log.asciidoc @@ -57,6 +57,55 @@ multiple input sections: IMPORTANT: Make sure a file is not defined more than once across all inputs because this can lead to unexpected behaviour. +[[file-identity]] +==== Reading files on network shares and cloud providers + +:WARNING: Filebeat does not support reading from network shares and cloud providers. + +However, one of the limitations of these data sources can be mitigated +if you configure Filebeat adequately. + +By default, {beatname_uc} identifies files based on their inodes and +device IDs. However, on network shares and cloud providers these +values might change during the lifetime of the file. If this happens +{beatname_uc} thinks that file is new and resends the whole content +of the file. To solve this problem you can configure `file_identity` option. Possible +values besides the default `inode_deviceid` are `path` and `inode_marker`. + +Selecting `path` instructs {beatname_uc} to identify files based on their +paths. This is a quick way to aviod rereading files if inode and device ids +might change. However, keep in mind if the files are rotated (renamed), they +will be reread and resubmitted. + +The option `inode_marker` can be used if the inodes stay the same even if +the device id is changed. You should choose this method if your files are +rotated instead of `path` if possible. You have to configure a marker file +readable by {beatname_uc} and set the path in the option `path` of `inode_marker`. + +The content of this file must be unique to the device. You can put the +UUID of the device or mountpoint where the input is stored. The following +example oneliner generates a hidden marker file for the selected mountpoint `/logs`: +Please note that you should not use this option on Windows as file identifiers might be +more volatile. + +["source","sh",subs="attributes"] +---- +$ lsblk -o MOUNTPOINT,UUD | grep /logs | awk '{print $2}' >> /logs/.filebeat-marker +---- + +To set the generated file as a marker for `file_identity` you should configure +the input the following way: + +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: log + paths: + - /logs/*.log + file_identity.inode_marker.path: /logs/.filebeat-marker +---- + + [[rotating-logs]] ==== Reading from rotating logs @@ -66,6 +115,10 @@ a pattern that matches the file you want to harvest and all of its rotated files. Also make sure your log rotation strategy prevents lost or duplicate messages. For more information, see <>. +Furthermore, to avoid duplicate of rotated log messages, do not use the +`path` method for `file_identity`. Or exclude the rotated files with `exclude_files` +option. + [id="{beatname_lc}-input-{type}-options"] ==== Configuration options diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index e57e9cbfd43..f52b731140f 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -437,6 +437,10 @@ filebeat.inputs: # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: ['.gz$'] + # Method to determine if two files are the same or not. By default + # the Beat considers two files the same if their inode and device id are the same. + #file_identity.native: ~ + # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering #fields: diff --git a/filebeat/input/file/file.go b/filebeat/input/file/file.go index 676a2d5cfcb..963a1015fb8 100644 --- a/filebeat/input/file/file.go +++ b/filebeat/input/file/file.go @@ -30,12 +30,8 @@ type File struct { State *State } -// Checks if the two files are the same. -func (f *File) IsSameFile(f2 *File) bool { - return os.SameFile(f.FileInfo, f2.FileInfo) -} - // IsSameFile checks if the given File path corresponds with the FileInfo given +// It is used to check if the file has been renamed. func IsSameFile(path string, info os.FileInfo) bool { fileInfo, err := os.Stat(path) diff --git a/filebeat/input/file/identifier.go b/filebeat/input/file/identifier.go new file mode 100644 index 00000000000..c16535f3e19 --- /dev/null +++ b/filebeat/input/file/identifier.go @@ -0,0 +1,121 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package file + +import ( + "fmt" + "strconv" + "strings" + + "github.com/mitchellh/hashstructure" + + "github.com/elastic/beats/v7/libbeat/common" +) + +const ( + nativeName = "native" + pathName = "path" + inodeMarkerName = "inode_marker" + + DefaultIdentifierName = nativeName + identitySep = "::" +) + +var ( + identifierFactories = map[string]IdentifierFactory{ + nativeName: newINodeDeviceIdentifier, + pathName: newPathIdentifier, + inodeMarkerName: newINodeMarkerIdentifier, + } +) + +type IdentifierFactory func(*common.Config) (StateIdentifier, error) + +// StateIdentifier generates an ID for a State. +type StateIdentifier interface { + // GenerateID generates and returns the ID of the state and its type + GenerateID(State) (id, identifierType string) +} + +// NewStateIdentifier creates a new state identifier for a log input. +func NewStateIdentifier(ns *common.ConfigNamespace) (StateIdentifier, error) { + if ns == nil { + return newINodeDeviceIdentifier(nil) + } + + identifierType := ns.Name() + f, ok := identifierFactories[identifierType] + if !ok { + return nil, fmt.Errorf("no such file_identity generator: %s", identifierType) + } + + return f(ns.Config()) +} + +type inodeDeviceIdentifier struct { + name string +} + +func newINodeDeviceIdentifier(_ *common.Config) (StateIdentifier, error) { + return &inodeDeviceIdentifier{ + name: nativeName, + }, nil +} + +func (i *inodeDeviceIdentifier) GenerateID(s State) (id, identifierType string) { + stateID := i.name + identitySep + s.FileStateOS.String() + return genIDWithHash(s.Meta, stateID), i.name +} + +type pathIdentifier struct { + name string +} + +func newPathIdentifier(_ *common.Config) (StateIdentifier, error) { + return &pathIdentifier{ + name: pathName, + }, nil +} + +func (p *pathIdentifier) GenerateID(s State) (id, identifierType string) { + stateID := p.name + identitySep + s.Source + return genIDWithHash(s.Meta, stateID), p.name +} + +func genIDWithHash(meta map[string]string, fileID string) string { + if len(meta) == 0 { + return fileID + } + + hashValue, _ := hashstructure.Hash(meta, nil) + var hashBuf [17]byte + hash := strconv.AppendUint(hashBuf[:0], hashValue, 16) + hash = append(hash, '-') + + var b strings.Builder + b.Grow(len(hash) + len(fileID)) + b.Write(hash) + b.WriteString(fileID) + + return b.String() +} + +// mockIdentifier is used for testing +type MockIdentifier struct{} + +func (m *MockIdentifier) GenerateID(s State) (string, string) { return s.Id, "mock" } diff --git a/filebeat/input/file/identifier_inode_deviceid.go b/filebeat/input/file/identifier_inode_deviceid.go new file mode 100644 index 00000000000..f5e191744d6 --- /dev/null +++ b/filebeat/input/file/identifier_inode_deviceid.go @@ -0,0 +1,98 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package file + +import ( + "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" +) + +type inodeMarkerIdentifier struct { + log *logp.Logger + name string + markerPath string + + markerFileLastModifitaion time.Time + markerTxt string +} + +func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) { + var config struct { + MarkerPath string `config:"path" validate:"required"` + } + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err) + } + + fi, err := os.Stat(config.MarkerPath) + if err != nil { + return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err) + } + markerContent, err := ioutil.ReadFile(config.MarkerPath) + if err != nil { + return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err) + } + return &inodeMarkerIdentifier{ + log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)), + name: inodeMarkerName, + markerPath: config.MarkerPath, + markerFileLastModifitaion: fi.ModTime(), + markerTxt: string(markerContent), + }, nil +} + +func (i *inodeMarkerIdentifier) markerContents() string { + f, err := os.Open(i.markerPath) + if err != nil { + i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err) + return "" + } + defer f.Close() + + fi, err := f.Stat() + if err != nil { + i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err) + return "" + } + if i.markerFileLastModifitaion.Before(fi.ModTime()) { + contents, err := ioutil.ReadFile(i.markerPath) + if err != nil { + i.log.Errorf("Error while reading contents of marker file: %v", err) + return "" + } + i.markerTxt = string(contents) + } + + return i.markerTxt +} + +func (i *inodeMarkerIdentifier) GenerateID(s State) (id, identifierType string) { + m := i.markerContents() + + stateID := fmt.Sprintf("%s%s%s-%s", i.name, identitySep, s.FileStateOS.InodeString(), m) + return genIDWithHash(s.Meta, stateID), i.name +} diff --git a/filebeat/input/file/file_test.go b/filebeat/input/file/identifier_inode_deviceid_windows.go similarity index 50% rename from filebeat/input/file/file_test.go rename to filebeat/input/file/identifier_inode_deviceid_windows.go index 1e2bc94d4bf..9fb1152a33c 100644 --- a/filebeat/input/file/file_test.go +++ b/filebeat/input/file/identifier_inode_deviceid_windows.go @@ -15,49 +15,16 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +// +build windows package file import ( - "os" - "path/filepath" - "testing" + "fmt" - "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/libbeat/common" ) -func TestIsSameFile(t *testing.T) { - absPath, err := filepath.Abs("../../tests/files/") - - assert.NotNil(t, absPath) - assert.Nil(t, err) - - fileInfo1, err := os.Stat(absPath + "/logs/test.log") - fileInfo2, err := os.Stat(absPath + "/logs/system.log") - - assert.Nil(t, err) - assert.NotNil(t, fileInfo1) - assert.NotNil(t, fileInfo2) - - file1 := &File{ - FileInfo: fileInfo1, - } - - file2 := &File{ - FileInfo: fileInfo2, - } - - file3 := &File{ - FileInfo: fileInfo2, - } - - assert.False(t, file1.IsSameFile(file2)) - assert.False(t, file2.IsSameFile(file1)) - - assert.True(t, file1.IsSameFile(file1)) - assert.True(t, file2.IsSameFile(file2)) - - assert.True(t, file3.IsSameFile(file2)) - assert.True(t, file2.IsSameFile(file3)) +func newINodeMarkerIdentifier(cfg *common.Config) (StateIdentifier, error) { + return nil, fmt.Errorf("inode_deviceid is not supported on Windows") } diff --git a/filebeat/input/file/identifier_test.go b/filebeat/input/file/identifier_test.go new file mode 100644 index 00000000000..f47f4a37fb9 --- /dev/null +++ b/filebeat/input/file/identifier_test.go @@ -0,0 +1,199 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build !windows + +package file + +import ( + "fmt" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/common/file" +) + +type stateTestCase struct { + states [2]State + isSame bool +} + +func TestINodeDeviceIdentifier(t *testing.T) { + tests := map[string]stateTestCase{ + "two states poiting to the same file": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/2", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + }, + true, + }, + "two states poiting to different files": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/2", + FileStateOS: file.StateOS{Inode: 2, Device: 1}, + }, + }, + false, + }, + } + + identifier, _ := newINodeDeviceIdentifier(nil) + for name, test := range tests { + test := test + for i := 0; i < len(test.states); i++ { + test.states[i].Id, test.states[i].IdentifierName = identifier.GenerateID(test.states[i]) + } + + t.Run(name, func(t *testing.T) { + isSame := test.states[0].IsEqual(&test.states[1]) + assert.Equal(t, isSame, test.isSame) + }) + } +} + +func TestPathIdentifier(t *testing.T) { + tests := map[string]stateTestCase{ + "two states poiting to the same file": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + }, + true, + }, + "two states poiting to different files": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/2", + FileStateOS: file.StateOS{Inode: 2, Device: 1}, + }, + }, + false, + }, + } + + identifier, _ := newPathIdentifier(nil) + for name, test := range tests { + test := test + for i := 0; i < len(test.states); i++ { + test.states[i].Id, test.states[i].IdentifierName = identifier.GenerateID(test.states[i]) + } + t.Run(name, func(t *testing.T) { + isSame := test.states[0].IsEqual(&test.states[1]) + assert.Equal(t, isSame, test.isSame) + }) + } +} + +func TestInodeMarkerIdentifier(t *testing.T) { + tests := map[string]stateTestCase{ + "two states poiting to the same file i.": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + }, + true, + }, + "two states poiting to the same file ii.": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 2}, + }, + }, + true, + }, + "two states poiting to different files i.": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/2", + FileStateOS: file.StateOS{Inode: 2, Device: 1}, + }, + }, + false, + }, + "two states poiting to different files ii.": { + [2]State{ + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 1, Device: 1}, + }, + State{ + Source: "/path/to/this/file/1", + FileStateOS: file.StateOS{Inode: 2, Device: 3}, + }, + }, + false, + }, + } + + identifier := newMockInodeMarkerIdentifier() + for name, test := range tests { + test := test + for i := 0; i < len(test.states); i++ { + test.states[i].Id, test.states[i].IdentifierName = identifier.GenerateID(test.states[i]) + } + t.Run(name, func(t *testing.T) { + isSame := test.states[0].IsEqual(&test.states[1]) + assert.Equal(t, isSame, test.isSame) + }) + } +} + +func newMockInodeMarkerIdentifier() StateIdentifier { + cfg := common.MustNewConfigFrom(map[string]string{"path": filepath.Join("testdata", "identifier_marker")}) + i, err := newINodeMarkerIdentifier(cfg) + fmt.Println(err) + return i +} diff --git a/filebeat/input/file/identifier_test_windows.go b/filebeat/input/file/identifier_test_windows.go new file mode 100644 index 00000000000..544dbad2546 --- /dev/null +++ b/filebeat/input/file/identifier_test_windows.go @@ -0,0 +1,29 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build windows + +package file + +import "testing" + +func TestInodeMarkerError(t *testing.T) { + _, err := newINodeMarkerIdentifier(nil) + if err == nil { + t.Fatal("inode_marker should not be supported on windows") + } +} diff --git a/filebeat/input/file/state.go b/filebeat/input/file/state.go index ef255243b4c..18fcfee583b 100644 --- a/filebeat/input/file/state.go +++ b/filebeat/input/file/state.go @@ -19,35 +19,34 @@ package file import ( "os" - "strconv" - "strings" "time" - "github.com/mitchellh/hashstructure" - "github.com/elastic/beats/v7/libbeat/common/file" ) // State is used to communicate the reading state of a file type State struct { - Id string `json:"-" struct:"-"` // local unique id to make comparison more efficient - Finished bool `json:"-" struct:"-"` // harvester state - Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info - Source string `json:"source" struct:"source"` - Offset int64 `json:"offset" struct:"offset"` - Timestamp time.Time `json:"timestamp" struct:"timestamp"` - TTL time.Duration `json:"ttl" struct:"ttl"` - Type string `json:"type" struct:"type"` - Meta map[string]string `json:"meta" struct:"meta,omitempty"` - FileStateOS file.StateOS `json:"FileStateOS" struct:"FileStateOS"` + Id string `json:"id" struct:"id"` + PrevId string `json:"prev_id" struct:"prev_id"` + Finished bool `json:"-" struct:"-"` // harvester state + Fileinfo os.FileInfo `json:"-" struct:"-"` // the file info + Source string `json:"source" struct:"source"` + Offset int64 `json:"offset" struct:"offset"` + Timestamp time.Time `json:"timestamp" struct:"timestamp"` + TTL time.Duration `json:"ttl" struct:"ttl"` + Type string `json:"type" struct:"type"` + Meta map[string]string `json:"meta" struct:"meta,omitempty"` + FileStateOS file.StateOS `json:"FileStateOS" struct:"FileStateOS"` + IdentifierName string `json:"identifier_name" struct:"identifier_name"` } // NewState creates a new file state -func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State { +func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string, identifier StateIdentifier) State { if len(meta) == 0 { meta = nil } - return State{ + + s := State{ Fileinfo: fileInfo, Source: path, Finished: false, @@ -57,43 +56,13 @@ func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]strin Type: t, Meta: meta, } -} - -// ID returns a unique id for the state as a string -func (s *State) ID() string { - // Generate id on first request. This is needed as id is not set when converting back from json - if s.Id == "" { - if len(s.Meta) == 0 { - s.Id = s.FileStateOS.String() - } else { - hashValue, _ := hashstructure.Hash(s.Meta, nil) - var hashBuf [17]byte - hash := strconv.AppendUint(hashBuf[:0], hashValue, 16) - hash = append(hash, '-') - - fileID := s.FileStateOS.String() - var b strings.Builder - b.Grow(len(hash) + len(fileID)) - b.Write(hash) - b.WriteString(fileID) + s.Id, s.IdentifierName = identifier.GenerateID(s) - s.Id = b.String() - } - } - - return s.Id + return s } -// IsEqual compares the state to an other state supporting stringer based on the unique string +// IsEqual checks if the two states point to the same file. func (s *State) IsEqual(c *State) bool { - return s.ID() == c.ID() -} - -// IsEmpty returns true if the state is empty -func (s *State) IsEmpty() bool { - return s.FileStateOS == file.StateOS{} && - s.Source == "" && - len(s.Meta) == 0 && - s.Timestamp.IsZero() + return s.Id == c.Id } diff --git a/filebeat/input/file/states.go b/filebeat/input/file/states.go index 34704b41dba..48cf338f80f 100644 --- a/filebeat/input/file/states.go +++ b/filebeat/input/file/states.go @@ -55,7 +55,7 @@ func (s *States) UpdateWithTs(newState State, ts time.Time) { s.Lock() defer s.Unlock() - id := newState.ID() + id := newState.Id index := s.findPrevious(id) newState.Timestamp = ts @@ -74,13 +74,20 @@ func (s *States) UpdateWithTs(newState State, ts time.Time) { func (s *States) FindPrevious(newState State) State { s.RLock() defer s.RUnlock() - i := s.findPrevious(newState.ID()) + i := s.findPrevious(newState.Id) if i < 0 { return State{} } return s.states[i] } +func (s *States) IsNew(state State) bool { + s.RLock() + defer s.RUnlock() + i := s.findPrevious(state.Id) + return i < 0 +} + // findPrevious returns the previous state for the file. // In case no previous state exists, index -1 is returned func (s *States) findPrevious(id string) int { @@ -120,17 +127,16 @@ func (s *States) CleanupWith(fn func(string)) (int, int) { continue } - id := state.ID() - delete(s.idx, id) + delete(s.idx, state.Id) if fn != nil { - fn(id) + fn(state.Id) } logp.Debug("state", "State removed for %v because of older: %v", state.Source, state.TTL) L-- if L != i { s.states[i] = s.states[L] - s.idx[s.states[i].ID()] = i + s.idx[s.states[i].Id] = i } } else { i++ @@ -172,7 +178,7 @@ func (s *States) SetStates(states []State) { // create new index s.idx = map[string]int{} for i := range states { - s.idx[states[i].ID()] = i + s.idx[states[i].Id] = i } } diff --git a/filebeat/input/file/testdata/identifier_marker b/filebeat/input/file/testdata/identifier_marker new file mode 100644 index 00000000000..2effed19113 --- /dev/null +++ b/filebeat/input/file/testdata/identifier_marker @@ -0,0 +1 @@ +1234-1234-1234-1234 diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go index 835358b4e66..c5f9f2049da 100644 --- a/filebeat/input/log/config.go +++ b/filebeat/input/log/config.go @@ -27,6 +27,7 @@ import ( cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/filebeat/harvester" "github.com/elastic/beats/v7/filebeat/input/file" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/cfgwarn" "github.com/elastic/beats/v7/libbeat/common/match" "github.com/elastic/beats/v7/libbeat/logp" @@ -35,43 +36,6 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/readjson" ) -var ( - defaultConfig = config{ - // Common - ForwarderConfig: harvester.ForwarderConfig{ - Type: cfg.DefaultType, - }, - CleanInactive: 0, - - // Input - Enabled: true, - IgnoreOlder: 0, - ScanFrequency: 10 * time.Second, - CleanRemoved: true, - HarvesterLimit: 0, - Symlinks: false, - TailFiles: false, - ScanSort: "", - ScanOrder: "asc", - RecursiveGlob: true, - - // Harvester - BufferSize: 16 * humanize.KiByte, - MaxBytes: 10 * humanize.MiByte, - LineTerminator: readfile.AutoLineTerminator, - LogConfig: LogConfig{ - Backoff: 1 * time.Second, - BackoffFactor: 2, - MaxBackoff: 10 * time.Second, - CloseInactive: 5 * time.Minute, - CloseRemoved: true, - CloseRenamed: false, - CloseEOF: false, - CloseTimeout: 0, - }, - } -) - type config struct { harvester.ForwarderConfig `config:",inline"` LogConfig `config:",inline"` @@ -81,16 +45,17 @@ type config struct { CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"` // Input - Enabled bool `config:"enabled"` - ExcludeFiles []match.Matcher `config:"exclude_files"` - IgnoreOlder time.Duration `config:"ignore_older"` - Paths []string `config:"paths"` - ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` - CleanRemoved bool `config:"clean_removed"` - HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"` - Symlinks bool `config:"symlinks"` - TailFiles bool `config:"tail_files"` - RecursiveGlob bool `config:"recursive_glob.enabled"` + Enabled bool `config:"enabled"` + ExcludeFiles []match.Matcher `config:"exclude_files"` + IgnoreOlder time.Duration `config:"ignore_older"` + Paths []string `config:"paths"` + ScanFrequency time.Duration `config:"scan_frequency" validate:"min=0,nonzero"` + CleanRemoved bool `config:"clean_removed"` + HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"` + Symlinks bool `config:"symlinks"` + TailFiles bool `config:"tail_files"` + RecursiveGlob bool `config:"recursive_glob.enabled"` + FileIdentity *common.ConfigNamespace `config:"file_identity"` // Harvester BufferSize int `config:"harvester_buffer_size"` @@ -147,6 +112,44 @@ var ValidScanSort = map[string]struct{}{ ScanSortFilename: {}, } +func defaultConfig() config { + return config{ + // Common + ForwarderConfig: harvester.ForwarderConfig{ + Type: cfg.DefaultType, + }, + CleanInactive: 0, + + // Input + Enabled: true, + IgnoreOlder: 0, + ScanFrequency: 10 * time.Second, + CleanRemoved: true, + HarvesterLimit: 0, + Symlinks: false, + TailFiles: false, + ScanSort: "", + ScanOrder: "asc", + RecursiveGlob: true, + FileIdentity: nil, + + // Harvester + BufferSize: 16 * humanize.KiByte, + MaxBytes: 10 * humanize.MiByte, + LineTerminator: readfile.AutoLineTerminator, + LogConfig: LogConfig{ + Backoff: 1 * time.Second, + BackoffFactor: 2, + MaxBackoff: 10 * time.Second, + CloseInactive: 5 * time.Minute, + CloseRemoved: true, + CloseRenamed: false, + CloseEOF: false, + CloseTimeout: 0, + }, + } +} + func (c *config) Validate() error { // DEPRECATED 6.0.0: warning is already outputted on input level if c.InputType != "" { diff --git a/filebeat/input/log/config_test.go b/filebeat/input/log/config_test.go index 7406014d049..f8160a830f7 100644 --- a/filebeat/input/log/config_test.go +++ b/filebeat/input/log/config_test.go @@ -59,7 +59,7 @@ func TestCleanOlderIgnoreOlderErrorEqual(t *testing.T) { func TestCleanOlderIgnoreOlder(t *testing.T) { config := config{ - CleanInactive: 10*time.Hour + defaultConfig.ScanFrequency + 1*time.Second, + CleanInactive: 10*time.Hour + defaultConfig().ScanFrequency + 1*time.Second, IgnoreOlder: 10 * time.Hour, Paths: []string{"hello"}, ForwarderConfig: harvester.ForwarderConfig{ diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go index 94162ebfec9..95043e94237 100644 --- a/filebeat/input/log/harvester.go +++ b/filebeat/input/log/harvester.go @@ -132,7 +132,7 @@ func NewHarvester( } h := &Harvester{ - config: defaultConfig, + config: defaultConfig(), state: state, states: states, publishState: publishState, diff --git a/filebeat/input/log/input.go b/filebeat/input/log/input.go index 032f5c11c92..d7bd4f18017 100644 --- a/filebeat/input/log/input.go +++ b/filebeat/input/log/input.go @@ -60,16 +60,17 @@ func init() { // Input contains the input and its config type Input struct { - cfg *common.Config - config config - states *file.States - harvesters *harvester.Registry - outlet channel.Outleter - stateOutlet channel.Outleter - done chan struct{} - numHarvesters atomic.Uint32 - meta map[string]string - stopOnce sync.Once + cfg *common.Config + config config + states *file.States + harvesters *harvester.Registry + outlet channel.Outleter + stateOutlet channel.Outleter + done chan struct{} + numHarvesters atomic.Uint32 + meta map[string]string + stopOnce sync.Once + fileStateIdentifier file.StateIdentifier } // NewInput instantiates a new Log @@ -85,7 +86,7 @@ func NewInput( } } - inputConfig := defaultConfig + inputConfig := defaultConfig() if err := cfg.Unpack(&inputConfig); err != nil { return nil, err @@ -101,6 +102,11 @@ func NewInput( return nil, fmt.Errorf("each input must have at least one path defined") } + identifier, err := file.NewStateIdentifier(inputConfig.FileIdentity) + if err != nil { + return nil, fmt.Errorf("failed to initialize file identity generator: %+v", err) + } + // Note: underlying output. // The input and harvester do have different requirements // on the timings the outlets must be closed/unblocked. @@ -125,14 +131,15 @@ func NewInput( } p := &Input{ - config: inputConfig, - cfg: cfg, - harvesters: harvester.NewRegistry(), - outlet: out, - stateOutlet: stateOut, - states: file.NewStates(), - done: context.Done, - meta: meta, + config: inputConfig, + cfg: cfg, + harvesters: harvester.NewRegistry(), + outlet: out, + stateOutlet: stateOut, + states: file.NewStates(), + done: context.Done, + meta: meta, + fileStateIdentifier: identifier, } // Create empty harvester to check if configs are fine @@ -171,6 +178,15 @@ func (p *Input) loadStates(states []file.State) error { return fmt.Errorf("Can only start an input when all related states are finished: %+v", state) } + // Convert state to current identifier if different + // and remove outdated state + newId, identifierName := p.fileStateIdentifier.GenerateID(state) + if state.IdentifierName != identifierName { + state.PrevId = state.Id + state.Id = newId + state.IdentifierName = identifierName + } + // Update input states and send new states to registry err := p.updateState(state) if err != nil { @@ -225,10 +241,14 @@ func (p *Input) Run() { } } else { // Check if existing source on disk and state are the same. Remove if not the case. - newState := file.NewState(stat, state.Source, p.config.Type, p.meta) - if !newState.FileStateOS.IsSame(state.FileStateOS) { + newState := file.NewState(stat, state.Source, p.config.Type, p.meta, p.fileStateIdentifier) + if state.IdentifierName != newState.IdentifierName { + logp.Debug("input", "file_identity configuration for file has changed from %s to %s, generating new id", state.IdentifierName, newState.IdentifierName) + state.Id, state.IdentifierName = p.fileStateIdentifier.GenerateID(state) + } + if !state.IsEqual(&newState) { p.removeState(state) - logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source) + logp.Debug("input", "Remove state of file as its identity has changed: %s", state.Source) } } } @@ -418,7 +438,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) { } logp.Debug("input", "Check file for harvesting: %s", absolutePath) // Create new state for comparison - newState := file.NewState(info, absolutePath, p.config.Type, p.meta) + newState := file.NewState(info, absolutePath, p.config.Type, p.meta, p.fileStateIdentifier) return newState, nil } @@ -476,11 +496,11 @@ func (p *Input) scan() { } // Load last state - lastState := p.states.FindPrevious(newState) + isNewState := p.states.IsNew(newState) // Ignores all files which fall under ignore_older if p.isIgnoreOlder(newState) { - err := p.handleIgnoreOlder(lastState, newState) + err := p.handleIgnoreOlder(isNewState, newState) if err != nil { logp.Err("Updating ignore_older state error: %s", err) } @@ -488,7 +508,7 @@ func (p *Input) scan() { } // Decides if previous state exists - if lastState.IsEmpty() { + if isNewState { logp.Debug("input", "Start harvester for new file: %s", newState.Source) err := p.startHarvester(newState, 0) if err == errHarvesterLimit { @@ -499,6 +519,7 @@ func (p *Input) scan() { logp.Err(harvesterErrMsg, newState.Source, err) } } else { + lastState := p.states.FindPrevious(newState) p.harvestExistingFile(newState, lastState) } } @@ -566,10 +587,11 @@ func (p *Input) harvestExistingFile(newState file.State, oldState file.State) { // handleIgnoreOlder handles states which fall under ignore older // Based on the state information it is decided if the state information has to be updated or not -func (p *Input) handleIgnoreOlder(lastState, newState file.State) error { +func (p *Input) handleIgnoreOlder(isNewState bool, newState file.State) error { logp.Debug("input", "Ignore file because ignore_older reached: %s", newState.Source) - if !lastState.IsEmpty() { + if !isNewState { + lastState := p.states.FindPrevious(newState) if !lastState.Finished { logp.Info("File is falling under ignore_older before harvesting is finished. Adjust your close_* settings: %s", newState.Source) } @@ -711,8 +733,26 @@ func (p *Input) updateState(state file.State) error { state.Meta = nil } + err := p.doUpdate(state) + if err != nil { + return err + } + + if state.PrevId != "" { + stateToRemove := file.State{Id: state.PrevId, TTL: 0, Finished: true, Meta: nil} + err := p.doUpdate(stateToRemove) + if err != nil { + return fmt.Errorf("failed to remove outdated states based on prev_id: %v", err) + } + } + + return nil +} + +func (p *Input) doUpdate(state file.State) error { // Update first internal state p.states.Update(state) + ok := p.outlet.OnEvent(beat.Event{ Private: state, }) @@ -720,7 +760,6 @@ func (p *Input) updateState(state file.State) error { logp.Info("input outlet closed") return errors.New("input outlet closed") } - return nil } diff --git a/filebeat/input/log/input_other_test.go b/filebeat/input/log/input_other_test.go index e37b4d0c1f2..0910bd2b291 100644 --- a/filebeat/input/log/input_other_test.go +++ b/filebeat/input/log/input_other_test.go @@ -147,8 +147,9 @@ func TestInit(t *testing.T) { config: config{ Paths: test.paths, }, - states: file.NewStates(), - outlet: TestOutlet{}, + states: file.NewStates(), + outlet: TestOutlet{}, + fileStateIdentifier: &file.MockIdentifier{}, } // Set states to finished diff --git a/filebeat/registrar/migrate.go b/filebeat/registrar/migrate.go index 4a76771878a..16e7b14744f 100644 --- a/filebeat/registrar/migrate.go +++ b/filebeat/registrar/migrate.go @@ -340,10 +340,9 @@ func fixStates(states []file.State) []file.State { state := &states[i] fixState(state) - id := state.ID() - old, exists := idx[id] + old, exists := idx[state.Id] if !exists { - idx[id] = state + idx[state.Id] = state } else { mergeStates(old, state) // overwrite the entry in 'old' } @@ -364,10 +363,16 @@ func fixStates(states []file.State) []file.State { // fixState updates a read state to fullfil required invariantes: // - "Meta" must be nil if len(Meta) == 0 +// - "Id" must be initialized func fixState(st *file.State) { if len(st.Meta) == 0 { st.Meta = nil } + + if len(st.IdentifierName) == 0 { + identifier, _ := file.NewStateIdentifier(nil) + st.Id, st.IdentifierName = identifier.GenerateID(*st) + } } // resetStates sets all states to finished and disable TTL on restart diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 118f7c276db..0faa8a38890 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -114,7 +114,7 @@ func (r *Registrar) Start() error { // Load the previous log file locations now, for use in input err := r.loadStates() if err != nil { - return fmt.Errorf("Error loading state: %v", err) + return fmt.Errorf("error loading state: %v", err) } r.wg.Add(1) @@ -302,7 +302,7 @@ func readStatesFrom(store *statestore.Store) ([]file.State, error) { func writeStates(store *statestore.Store, states []file.State) error { for i := range states { - key := fileStatePrefix + states[i].ID() + key := fileStatePrefix + states[i].Id if err := store.Set(key, states[i]); err != nil { return err } diff --git a/filebeat/tests/system/test_harvester.py b/filebeat/tests/system/test_harvester.py index be2e4f42b8f..cb30dc4976b 100644 --- a/filebeat/tests/system/test_harvester.py +++ b/filebeat/tests/system/test_harvester.py @@ -79,7 +79,7 @@ def test_close_renamed(self): def test_close_removed(self): """ - Checks that a file is closed if removed + Checks that a file is closed if removed with native file identifier """ self.render_config_template( path=os.path.abspath(self.working_dir) + "/log/test.log", diff --git a/filebeat/tests/system/test_input.py b/filebeat/tests/system/test_input.py index 684f4f852af..8aebe49f4d0 100644 --- a/filebeat/tests/system/test_input.py +++ b/filebeat/tests/system/test_input.py @@ -2,7 +2,9 @@ from filebeat import BaseTest import os +import sys import time +import unittest from beat.beat import Proc @@ -682,3 +684,112 @@ def test_input_processing_pipeline_disable_host(self): output = self.read_output() assert "host.name" not in output[0] + + def test_path_based_identity_tracking(self): + """ + Renamed files are picked up again as the path of the file has changed. + """ + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + close_eof="true", + input_raw=" file_identity.path: ~", + ) + + testfile = os.path.join(self.working_dir, "log", "test.log") + self.__write_hello_word_to_test_input_file(testfile) + + proc = self.start_beat() + + # wait until the file is picked up + self.wait_until(lambda: self.output_has(lines=1)) + + renamedfile = os.path.join(self.working_dir, "log", "renamed.log") + os.rename(testfile, renamedfile) + + # wait until the both messages are received by the output + self.wait_until(lambda: self.output_has(lines=2)) + proc.check_kill_and_wait() + + # assert that renaming of the file went undetected + assert not self.log_contains("File rename was detected:" + testfile + " -> " + renamedfile) + + @unittest.skipIf(sys.platform.startswith("win"), "inode_marker is not supported on windows") + def test_inode_marker_based_identity_tracking(self): + """ + File is picked up again if the contents of the marker file changes. + """ + + marker_location = os.path.join(self.working_dir, "marker") + with open(marker_location, 'w') as m: + m.write("very-unique-string") + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + close_eof="true", + input_raw=" file_identity.inode_marker.path: " + marker_location, + ) + + testfile = os.path.join(self.working_dir, "log", "test.log") + self.__write_hello_word_to_test_input_file(testfile) + + proc = self.start_beat() + + # wait until the file is picked up + self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) + + # change the ID in the marker file to simulate a new file + with open(marker_location, 'w') as m: + m.write("different-very-unique-id") + + self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) + + # wait until the both messages are received by the output + self.wait_until(lambda: self.output_has(lines=2)) + proc.check_kill_and_wait() + + @unittest.skipIf(sys.platform.startswith("win"), "inode_marker is not supported on windows") + def test_inode_marker_based_identity_tracking_to_path_based(self): + """ + File reading can be continued after file_identity is changed. + """ + + marker_location = os.path.join(self.working_dir, "marker") + with open(marker_location, 'w') as m: + m.write("very-unique-string") + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + input_raw=" file_identity.inode_marker.path: " + marker_location, + ) + + testfile = os.path.join(self.working_dir, "log", "test.log") + self.__write_hello_word_to_test_input_file(testfile) + + proc = self.start_beat() + + # wait until the file is picked up + self.wait_until(lambda: self.log_contains("Start harvester for new file: " + testfile)) + + self.wait_until(lambda: self.output_has(lines=1)) + proc.check_kill_and_wait() + + self.render_config_template( + path=os.path.abspath(self.working_dir) + "/log/*", + rotateonstartup="false", + input_raw=" file_identity.path: ~", + ) + + with open(testfile, 'w+') as f: + f.write("hello world again\n") + + proc = self.start_beat() + + # on startup output is rotated + self.wait_until(lambda: self.output_has(lines=1, output_file="output/filebeat.1")) + self.wait_until(lambda: self.output_has(lines=1)) + proc.check_kill_and_wait() + + def __write_hello_word_to_test_input_file(self, testfile): + os.mkdir(self.working_dir + "/log/") + with open(testfile, 'w') as f: + f.write("hello world\n") diff --git a/libbeat/common/file/file_other.go b/libbeat/common/file/file_other.go index 599108f480b..fa2082da8ac 100644 --- a/libbeat/common/file/file_other.go +++ b/libbeat/common/file/file_other.go @@ -68,3 +68,8 @@ func IsRemoved(f *os.File) bool { _, err := os.Stat(f.Name()) return err != nil } + +// InodeString returns the inode in string. +func (s *StateOS) InodeString() string { + return strconv.FormatUint(s.Inode, 10) +} diff --git a/libbeat/common/file/file_windows.go b/libbeat/common/file/file_windows.go index 1a9ac6e1c76..1b8a9da49de 100644 --- a/libbeat/common/file/file_windows.go +++ b/libbeat/common/file/file_windows.go @@ -146,3 +146,12 @@ func IsRemoved(f *os.File) bool { } return info.DeletePending } + +// InodeString returns idxhi and idxlo as a string. +func (fs *StateOS) InodeString() string { + var buf [61]byte + current := strconv.AppendUint(buf[:0], fs.IdxHi, 10) + current = append(current, '-') + current = strconv.AppendUint(current, fs.IdxLo, 10) + return string(current) +} diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 853eec3f827..879676c267c 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -1545,6 +1545,10 @@ filebeat.inputs: # are matching any regular expression from the list. By default, no files are dropped. #exclude_files: ['.gz$'] + # Method to determine if two files are the same or not. By default + # the Beat considers two files the same if their inode and device id are the same. + #file_identity.native: ~ + # Optional additional fields. These fields can be freely picked # to add additional information to the crawled log files for filtering #fields: