Skip to content

Commit

Permalink
Implementation of fileProspector (elastic#21479) (elastic#21529)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds the implementation of `fileProspector`. The prospector listens for events from the `FSWatcher` and processes them depending on the type of the event. Possible actions are starting a new Harvester to read from a file, removing an entry from the registry, etc.

(cherry picked from commit d987d10)
  • Loading branch information
kvch authored Oct 5, 2020
1 parent 0b22a3c commit f26aa12
Show file tree
Hide file tree
Showing 5 changed files with 457 additions and 2 deletions.
139 changes: 139 additions & 0 deletions filebeat/input/filestream/identifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package filestream

import (
"fmt"
"os"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
)

const (
nativeName = "native"
pathName = "path"
inodeMarkerName = "inode_marker"

DefaultIdentifierName = nativeName
identitySep = "::"
)

var (
identifierFactories = map[string]identifierFactory{
nativeName: newINodeDeviceIdentifier,
pathName: newPathIdentifier,
inodeMarkerName: newINodeMarkerIdentifier,
}
)

type identifierFactory func(*common.Config) (fileIdentifier, error)

type fileIdentifier interface {
GetSource(loginp.FSEvent) fileSource
Name() string
}

// fileSource implements the Source interface
// It is required to identify and manage file sources.
type fileSource struct {
info os.FileInfo
newPath string
oldPath string

name string
identifierGenerator string
}

// Name returns the registry identifier of the file.
func (f fileSource) Name() string {
return f.name
}

// newFileIdentifier creates a new state identifier for a log input.
func newFileIdentifier(ns *common.ConfigNamespace) (fileIdentifier, error) {
if ns == nil {
return newINodeDeviceIdentifier(nil)
}

identifierType := ns.Name()
f, ok := identifierFactories[identifierType]
if !ok {
return nil, fmt.Errorf("no such file_identity generator: %s", identifierType)
}

return f(ns.Config())
}

type inodeDeviceIdentifier struct {
name string
}

func newINodeDeviceIdentifier(_ *common.Config) (fileIdentifier, error) {
return &inodeDeviceIdentifier{
name: nativeName,
}, nil
}

func (i *inodeDeviceIdentifier) GetSource(e loginp.FSEvent) fileSource {
return fileSource{
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: pluginName + identitySep + i.name + identitySep + file.GetOSState(e.Info).String(),
identifierGenerator: i.name,
}
}

func (i *inodeDeviceIdentifier) Name() string {
return i.name
}

type pathIdentifier struct {
name string
}

func newPathIdentifier(_ *common.Config) (fileIdentifier, error) {
return &pathIdentifier{
name: pathName,
}, nil
}

func (p *pathIdentifier) GetSource(e loginp.FSEvent) fileSource {
return fileSource{
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: pluginName + identitySep + p.name + identitySep + e.NewPath,
identifierGenerator: p.name,
}
}

func (p *pathIdentifier) Name() string {
return p.name
}

// mockIdentifier is used for testing
type MockIdentifier struct{}

func (m *MockIdentifier) GetSource(e loginp.FSEvent) fileSource {
return fileSource{identifierGenerator: "mock"}
}

func (m *MockIdentifier) Name() string { return "mock" }
108 changes: 108 additions & 0 deletions filebeat/input/filestream/identifier_inode_deviceid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build !windows

package filestream

import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"time"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
)

type inodeMarkerIdentifier struct {
log *logp.Logger
name string
markerPath string

markerFileLastModifitaion time.Time
markerTxt string
}

func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) {
var config struct {
MarkerPath string `config:"path" validate:"required"`
}
err := cfg.Unpack(&config)
if err != nil {
return nil, fmt.Errorf("error while reading configuration of INode + marker file configuration: %v", err)
}

fi, err := os.Stat(config.MarkerPath)
if err != nil {
return nil, fmt.Errorf("error while opening marker file at %s: %v", config.MarkerPath, err)
}
markerContent, err := ioutil.ReadFile(config.MarkerPath)
if err != nil {
return nil, fmt.Errorf("error while reading marker file at %s: %v", config.MarkerPath, err)
}
return &inodeMarkerIdentifier{
log: logp.NewLogger("inode_marker_identifier_" + filepath.Base(config.MarkerPath)),
name: inodeMarkerName,
markerPath: config.MarkerPath,
markerFileLastModifitaion: fi.ModTime(),
markerTxt: string(markerContent),
}, nil
}

func (i *inodeMarkerIdentifier) markerContents() string {
f, err := os.Open(i.markerPath)
if err != nil {
i.log.Errorf("Failed to open marker file %s: %v", i.markerPath, err)
return ""
}
defer f.Close()

fi, err := f.Stat()
if err != nil {
i.log.Errorf("Failed to fetch file information for %s: %v", i.markerPath, err)
return ""
}
if i.markerFileLastModifitaion.Before(fi.ModTime()) {
contents, err := ioutil.ReadFile(i.markerPath)
if err != nil {
i.log.Errorf("Error while reading contents of marker file: %v", err)
return ""
}
i.markerTxt = string(contents)
}

return i.markerTxt
}

func (i *inodeMarkerIdentifier) GetSource(e loginp.FSEvent) fileSource {
osstate := file.GetOSState(e.Info)
return fileSource{
info: e.Info,
newPath: e.NewPath,
oldPath: e.OldPath,
name: fmt.Sprintf("%s%s%s-%s", i.name, identitySep, osstate.InodeString(), i.markerContents()),
identifierGenerator: i.name,
}
}

func (i *inodeMarkerIdentifier) Name() string {
return i.name
}
30 changes: 30 additions & 0 deletions filebeat/input/filestream/identifier_inode_deviceid_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build windows

package filestream

import (
"fmt"

"github.com/elastic/beats/v7/libbeat/common"
)

func newINodeMarkerIdentifier(cfg *common.Config) (fileIdentifier, error) {
return nil, fmt.Errorf("inode_deviceid is not supported on Windows")
}
6 changes: 6 additions & 0 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ import (
// are actively written by other applications.
type filestream struct{}

type state struct {
Source string `json:"source" struct:"source"`
Offset int64 `json:"offset" struct:"offset"`
IdentifierName string `json:"identifier_name" struct:"identifier_name"`
}

const pluginName = "filestream"

// Plugin creates a new filestream input plugin for creating a stateful input.
Expand Down
Loading

0 comments on commit f26aa12

Please sign in to comment.