Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Max bytes & Reuse Harvester #1

Merged
merged 2 commits into from
Jul 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions filebeat/input/log/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
RecursiveGlob: true,

// Harvester
ReuseHarvester: true,
ReuseMaxBytes: 100 * humanize.MiByte,

BufferSize: 16 * humanize.KiByte,
MaxBytes: 10 * humanize.MiByte,
LogConfig: LogConfig{
Expand Down Expand Up @@ -91,10 +94,12 @@ type config struct {
RecursiveGlob bool `config:"recursive_glob.enabled"`

// Harvester
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`
ReuseHarvester bool `config:"reuse_harvester"`
ReuseMaxBytes int64 `config:"reuse_max_bytes"`
BufferSize int `config:"harvester_buffer_size"`
Encoding string `config:"encoding"`
ScanOrder string `config:"scan.order"`
ScanSort string `config:"scan.sort"`

ExcludeLines []match.Matcher `config:"exclude_lines"`
IncludeLines []match.Matcher `config:"include_lines"`
Expand Down
234 changes: 26 additions & 208 deletions filebeat/input/log/harvester.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,32 +29,23 @@
package log

import (
"bytes"
"errors"
"fmt"
file_helper "github.com/elastic/beats/libbeat/common/file"
"io"
"os"
"sync"
"time"

"github.com/gofrs/uuid"
"golang.org/x/text/transform"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
file_helper "github.com/elastic/beats/libbeat/common/file"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/gofrs/uuid"

"github.com/elastic/beats/filebeat/channel"
"github.com/elastic/beats/filebeat/harvester"
"github.com/elastic/beats/filebeat/input/file"
"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/reader"
"github.com/elastic/beats/libbeat/reader/debug"
"github.com/elastic/beats/libbeat/reader/multiline"
"github.com/elastic/beats/libbeat/reader/readfile"
"github.com/elastic/beats/libbeat/reader/readfile/encoding"
"github.com/elastic/beats/libbeat/reader/readjson"
)

Expand All @@ -71,6 +62,7 @@ var (
ErrRemoved = errors.New("file was removed")
ErrInactive = errors.New("file inactive")
ErrClosed = errors.New("reader closed")
ErrReadTimeout = errors.New("reader timeout")
)

// OutletFactory provides an outlet for the harvester
Expand All @@ -80,7 +72,6 @@ type OutletFactory func() channel.Outleter
type Harvester struct {
id uuid.UUID
config config
source harvester.Source // the source being watched

// shutdown handling
done chan struct{}
Expand All @@ -91,12 +82,9 @@ type Harvester struct {
// internal harvester state
state file.State
states *file.States
log *Log

// file reader pipeline
reader reader.Reader
encodingFactory encoding.EncodingFactory
encoding encoding.Encoding
reader *ReuseHarvester

// event/state publishing
outletFactory OutletFactory
Expand Down Expand Up @@ -134,55 +122,23 @@ func NewHarvester(
return nil, err
}

encodingFactory, ok := encoding.FindEncoding(h.config.Encoding)
if !ok || encodingFactory == nil {
return nil, fmt.Errorf("unknown encoding('%v')", h.config.Encoding)
}
h.encodingFactory = encodingFactory

// Add ttl if clean_inactive is set
if h.config.CleanInactive > 0 {
h.state.TTL = h.config.CleanInactive
}

// Add outlet signal so harvester can also stop itself
return h, nil
}

// open does open the file given under h.Path and assigns the file handler to h.log
func (h *Harvester) open() error {
switch h.config.Type {
case harvester.StdinType:
return h.openStdin()
case harvester.LogType:
return h.openFile()
case harvester.DockerType:
return h.openFile()
default:
return fmt.Errorf("Invalid harvester type: %+v", h.config)
}
}

// ID returns the unique harvester identifier
func (h *Harvester) ID() uuid.UUID {
return h.id
}

// Setup opens the file handler and creates the reader for the harvester
func (h *Harvester) Setup() error {
err := h.open()
if err != nil {
return fmt.Errorf("Harvester setup failed. Unexpected file opening error: %s", err)
}

h.reader, err = h.newLogFileReader()
//init reuse reader
var err error
h.reader, err = NewReuseHarvester(h.id, h.config, h.state)
if err != nil {
if h.source != nil {
h.source.Close()
}
return fmt.Errorf("Harvester setup failed. Unexpected encoding line reader error: %s", err)
return fmt.Errorf("harvester init failed. Unexpected encoding line reader error: %s", err)
}

return nil
}

Expand Down Expand Up @@ -214,6 +170,9 @@ func (h *Harvester) Run() error {
}

defer func() {
// Close reader
h.reader.Stop()

// Channel to stop internal harvester routines
h.stop()

Expand Down Expand Up @@ -248,7 +207,6 @@ func (h *Harvester) Run() error {
}

h.stop()
h.log.Close()
}(h.state.Source)

logp.Info("Harvester started for file: %s, offset: %d", h.state.Source, h.state.Offset)
Expand Down Expand Up @@ -283,12 +241,6 @@ func (h *Harvester) Run() error {
return nil
}

// Strip UTF-8 BOM if beginning of file
// As all BOMS are converted to UTF-8 it is enough to only remove this one
if h.state.Offset == 0 {
message.Content = bytes.Trim(message.Content, "\xef\xbb\xbf")
}

// Get copy of state to work on
// This is important in case sending is not successful so on shutdown
// the old offset is reported
Expand All @@ -297,7 +249,7 @@ func (h *Harvester) Run() error {

// Create state event
data := util.NewData()
if h.source.HasState() {
if h.reader.HasState() {
data.SetState(state)
}

Expand Down Expand Up @@ -365,7 +317,7 @@ func (h *Harvester) Stop() {
// sendEvent sends event to the spooler channel
// Return false if event was not sent
func (h *Harvester) sendEvent(data *util.Data, forwarder *harvester.Forwarder) bool {
if h.source.HasState() {
if h.reader.HasState() {
h.states.Update(data.GetState())
}

Expand All @@ -379,7 +331,7 @@ func (h *Harvester) sendEvent(data *util.Data, forwarder *harvester.Forwarder) b
// is started. As soon as the output becomes available again, the finished state is written
// and processing can continue.
func (h *Harvester) SendStateUpdate() {
if !h.source.HasState() {
if !h.reader.HasState() {
return
}

Expand Down Expand Up @@ -412,93 +364,6 @@ func (h *Harvester) shouldExportLine(line string) bool {
return true
}

// openFile opens a file and checks for the encoding. In case the encoding cannot be detected
// or the file cannot be opened because for example of failing read permissions, an error
// is returned and the harvester is closed. The file will be picked up again the next time
// the file system is scanned
func (h *Harvester) openFile() error {
f, err := file_helper.ReadOpen(h.state.Source)
if err != nil {
return fmt.Errorf("Failed opening %s: %s", h.state.Source, err)
}

harvesterOpenFiles.Add(1)

// Makes sure file handler is also closed on errors
err = h.validateFile(f)
if err != nil {
f.Close()
harvesterOpenFiles.Add(-1)
return err
}

h.source = File{File: f}
return nil
}

func (h *Harvester) validateFile(f *os.File) error {
info, err := f.Stat()
if err != nil {
return fmt.Errorf("Failed getting stats for file %s: %s", h.state.Source, err)
}

if !info.Mode().IsRegular() {
return fmt.Errorf("Tried to open non regular file: %q %s", info.Mode(), info.Name())
}

// Compares the stat of the opened file to the state given by the input. Abort if not match.
if !os.SameFile(h.state.Fileinfo, info) {
return errors.New("file info is not identical with opened file. Aborting harvesting and retrying file later again")
}

h.encoding, err = h.encodingFactory(f)
if err != nil {

if err == transform.ErrShortSrc {
logp.Info("Initialising encoding for '%v' failed due to file being too short", f)
} else {
logp.Err("Initialising encoding for '%v' failed: %v", f, err)
}
return err
}

// get file offset. Only update offset if no error
offset, err := h.initFileOffset(f)
if err != nil {
return err
}

logp.Debug("harvester", "Setting offset for file: %s. Offset: %d ", h.state.Source, offset)
h.state.Offset = offset

return nil
}

func (h *Harvester) initFileOffset(file *os.File) (int64, error) {
// continue from last known offset
if h.state.Offset > 0 {
logp.Debug("harvester", "Set previous offset for file: %s. Offset: %d ", h.state.Source, h.state.Offset)
return file.Seek(h.state.Offset, os.SEEK_SET)
}

// get offset from file in case of encoding factory was required to read some data.
logp.Debug("harvester", "Setting offset for file based on seek: %s", h.state.Source)
return file.Seek(0, os.SEEK_CUR)
}

// getState returns an updated copy of the harvester state
func (h *Harvester) getState() file.State {
if !h.source.HasState() {
return file.State{}
}

state := h.state

// refreshes the values in State with the values from the harvester itself
state.FileStateOS = file_helper.GetOSState(h.state.Fileinfo)
return state
}

func (h *Harvester) cleanup() {
// Mark harvester as finished
h.state.Finished = true
Expand All @@ -508,14 +373,7 @@ func (h *Harvester) cleanup() {

// Make sure file is closed as soon as harvester exits
// If file was never opened, it can't be closed
if h.source != nil {

// close file handler
h.source.Close()

logp.Debug("harvester", "Closing file: %s", h.state.Source)
harvesterOpenFiles.Add(-1)

if h.reader != nil {
// On completion, push offset so we can continue where we left off if we relaunch on the same file
// Only send offset if file object was created successfully
h.SendStateUpdate()
Expand All @@ -526,58 +384,18 @@ func (h *Harvester) cleanup() {
harvesterClosed.Add(1)
}

// newLogFileReader creates a new reader to read log files
//
// It creates a chain of readers which looks as following:
//
// limit -> (multiline -> timeout) -> strip_newline -> json -> encode -> line -> log_file
//
// Each reader on the left, contains the reader on the right and calls `Next()` to fetch more data.
// At the base of all readers the the log_file reader. That means in the data is flowing in the opposite direction:
//
// log_file -> line -> encode -> json -> strip_newline -> (timeout -> multiline) -> limit
//
// log_file implements io.Reader interface and encode reader is an adapter for io.Reader to
// reader.Reader also handling file encodings. All other readers implement reader.Reader
func (h *Harvester) newLogFileReader() (reader.Reader, error) {
var r reader.Reader
var err error

// TODO: NewLineReader uses additional buffering to deal with encoding and testing
// for new lines in input stream. Simple 8-bit based encodings, or plain
// don't require 'complicated' logic.
h.log, err = NewLog(h.source, h.config.LogConfig)
if err != nil {
return nil, err
}

reader, err := debug.AppendReaders(h.log)
if err != nil {
return nil, err
}

r, err = readfile.NewEncodeReader(reader, h.encoding, h.config.BufferSize)
if err != nil {
return nil, err
}

if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.ForceCRI, h.config.DockerJSON.CRIFlags)
}

if h.config.JSON != nil {
r = readjson.NewJSONReader(r, h.config.JSON)
// getState returns an updated copy of the harvester state
func (h *Harvester) getState() file.State {
if !h.reader.HasState() {
return file.State{}
}

r = readfile.NewStripNewline(r)

if h.config.Multiline != nil {
r, err = multiline.New(r, "\n", h.config.MaxBytes, h.config.Multiline)
if err != nil {
return nil, err
}
}
state := h.state

return readfile.NewLimitReader(r, h.config.MaxBytes), nil
// refreshes the values in State with the values from the harvester itself
fileState := h.reader.GetState()
state.Source = fileState.Source
state.TTL = fileState.TTL
state.FileStateOS = file_helper.GetOSState(fileState.Fileinfo)
return state
}
Loading