Skip to content

Commit

Permalink
Cherry-pick #7281 to 6.3: Keep different registry entry per container…
Browse files Browse the repository at this point in the history
… stream (#7300)

Cherry-pick of PR #7281 to 6.3 branch. Original message: 

This PR adds a metadata map to prospectors state. Adding metadata to states allows having a different state per unique metadata for the same file.

This change is used by the `docker` prospector to have different states per stream, as some users configure a container prospector per stream (`stdout` & `stderr`).

Probably fixes #7045
  • Loading branch information
exekias authored and Steffen Siering committed Jun 14, 2018
1 parent 9a51f38 commit 5383e48
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 15 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits]
- Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877]
- Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]
- Keep different registry entry per container stream to avoid wrong offsets. {issue}7281[7281]
*Heartbeat*
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func NewInput(
if err := cfg.SetString("docker-json", -1, config.Containers.Stream); err != nil {
return nil, errors.Wrap(err, "update input config")
}

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
context.Meta["stream"] = config.Containers.Stream
}

return log.NewInput(cfg, outletFactory, context)
}

Expand Down
40 changes: 29 additions & 11 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,30 @@ package file

import (
"os"
"strconv"
"time"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/libbeat/common/file"
)

// State is used to communicate the reading state of a file
type State struct {
Id string `json:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Id string `json:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Meta map[string]string `json:"meta"`
FileStateOS file.StateOS
}

// NewState creates a new file state
func NewState(fileInfo os.FileInfo, path string, t string) State {
func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State {
return State{
Fileinfo: fileInfo,
Source: path,
Expand All @@ -30,15 +34,26 @@ func NewState(fileInfo os.FileInfo, path string, t string) State {
Timestamp: time.Now(),
TTL: -1, // By default, state does have an infinite ttl
Type: t,
Meta: meta,
}
}

// ID returns a unique id for the state as a string
func (s *State) ID() string {
// Generate id on first request. This is needed as id is not set when converting back from json
if s.Id == "" {
s.Id = s.FileStateOS.String()
if s.Meta == nil {
s.Id = s.FileStateOS.String()
} else {
hashValue, _ := hashstructure.Hash(s.Meta, nil)
var hashBuf [17]byte
hash := strconv.AppendUint(hashBuf[:0], hashValue, 16)
hash = append(hash, '-')

s.Id = string(hash) + s.FileStateOS.String()
}
}

return s.Id
}

Expand All @@ -49,5 +64,8 @@ func (s *State) IsEqual(c *State) bool {

// IsEmpty returns true if the state is empty
func (s *State) IsEmpty() bool {
return *s == State{}
return s.FileStateOS == file.StateOS{} &&
s.Source == "" &&
s.Meta == nil &&
s.Timestamp.IsZero()
}
1 change: 1 addition & 0 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func New(
Done: input.done,
BeatDone: input.beatDone,
DynamicFields: dynFields,
Meta: map[string]string{},
}
var ipt Input
ipt, err = f(conf, outlet, context)
Expand Down
23 changes: 20 additions & 3 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Input struct {
stateOutlet channel.Outleter
done chan struct{}
numHarvesters atomic.Uint32
meta map[string]string
}

// NewInput instantiates a new Log
Expand Down Expand Up @@ -80,6 +81,7 @@ func NewInput(
stateOutlet: stateOut,
states: file.NewStates(),
done: context.Done,
meta: context.Meta,
}

if err := cfg.Unpack(&p.config); err != nil {
Expand Down Expand Up @@ -121,7 +123,7 @@ func (p *Input) loadStates(states []file.State) error {

for _, state := range states {
// Check if state source belongs to this input. If yes, update the state.
if p.matchesFile(state.Source) {
if p.matchesFile(state.Source) && p.matchesMeta(state.Meta) {
state.TTL = -1

// In case a input is tried to be started with an unfinished state matching the glob pattern
Expand Down Expand Up @@ -183,7 +185,7 @@ func (p *Input) Run() {
}
} else {
// Check if existing source on disk and state are the same. Remove if not the case.
newState := file.NewState(stat, state.Source, p.config.Type)
newState := file.NewState(stat, state.Source, p.config.Type, p.meta)
if !newState.FileStateOS.IsSame(state.FileStateOS) {
p.removeState(state)
logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source)
Expand Down Expand Up @@ -297,6 +299,21 @@ func (p *Input) matchesFile(filePath string) bool {
return false
}

// matchesMeta returns true in case the given meta is equal to the one of this input, false if not
func (p *Input) matchesMeta(meta map[string]string) bool {
if len(meta) != len(p.meta) {
return false
}

for k, v := range p.meta {
if meta[k] != v {
return false
}
}

return true
}

type FileSortInfo struct {
info os.FileInfo
path string
Expand Down Expand Up @@ -361,7 +378,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) {
}
logp.Debug("input", "Check file for harvesting: %s", absolutePath)
// Create new state for comparison
newState := file.NewState(info, absolutePath, p.config.Type)
newState := file.NewState(info, absolutePath, p.config.Type, p.meta)
return newState, nil
}

Expand Down
55 changes: 55 additions & 0 deletions filebeat/input/log/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,61 @@ func TestIsCleanInactive(t *testing.T) {
}
}

func TestMatchesMeta(t *testing.T) {
tests := []struct {
Input *Input
Meta map[string]string
Result bool
}{
{
Input: &Input{
meta: map[string]string{
"it": "matches",
},
},
Meta: map[string]string{
"it": "matches",
},
Result: true,
},
{
Input: &Input{
meta: map[string]string{
"it": "doesnt",
"doesnt": "match",
},
},
Meta: map[string]string{
"it": "doesnt",
},
Result: false,
},
{
Input: &Input{
meta: map[string]string{
"it": "doesnt",
},
},
Meta: map[string]string{
"it": "doesnt",
"doesnt": "match",
},
Result: false,
},
{
Input: &Input{
meta: map[string]string{},
},
Meta: map[string]string{},
Result: true,
},
}

for _, test := range tests {
assert.Equal(t, test.Result, test.Input.matchesMeta(test.Meta))
}
}

type TestFileInfo struct {
time time.Time
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Context struct {
Done chan struct{}
BeatDone chan struct{}
DynamicFields *common.MapStrPointer
Meta map[string]string
}

type Factory = func(config *common.Config, outletFactory channel.Factory, context Context) (Input, error)
Expand Down
55 changes: 55 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,3 +1506,58 @@ def test_registrar_files_with_input_level_processors(self):
"inode": stat.st_ino,
"device": stat.st_dev,
}, file_state_os)

def test_registrar_meta(self):
"""
Check that multiple entries for the same file are on the registry when they have
different meta
"""

self.render_config_template(
type='docker',
input_raw='''
containers:
path: {path}
stream: stdout
ids:
- container_id
- type: docker
containers:
path: {path}
stream: stderr
ids:
- container_id
'''.format(path=os.path.abspath(self.working_dir) + "/log/")
)
os.mkdir(self.working_dir + "/log/")
os.mkdir(self.working_dir + "/log/container_id")
testfile_path1 = self.working_dir + "/log/container_id/test.log"

with open(testfile_path1, 'w') as f:
for i in range(0, 10):
f.write('{"log":"hello\\n","stream":"stdout","time":"2018-04-13T13:39:57.924216596Z"}\n')
f.write('{"log":"hello\\n","stream":"stderr","time":"2018-04-13T13:39:57.924216596Z"}\n')

filebeat = self.start_beat()

self.wait_until(
lambda: self.output_has(lines=20),
max_timeout=15)

# wait until the registry file exist. Needed to avoid a race between
# the logging and actual writing the file. Seems to happen on Windows.

self.wait_until(
lambda: os.path.isfile(os.path.join(self.working_dir,
"registry")),
max_timeout=1)

filebeat.check_kill_and_wait()

# Check registry contains 2 entries with meta
data = self.get_registry()
assert len(data) == 2
assert data[0]["source"] == data[1]["source"]
assert data[0]["meta"]["stream"] in ("stdout", "stderr")
assert data[1]["meta"]["stream"] in ("stdout", "stderr")
assert data[0]["meta"]["stream"] != data[1]["meta"]["stream"]
2 changes: 1 addition & 1 deletion filebeat/util/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (d *Data) GetState() file.State {

// HasState returns true if the data object contains state data
func (d *Data) HasState() bool {
return d.state != file.State{}
return !d.state.IsEmpty()
}

// GetEvent returns the event in the data object
Expand Down

0 comments on commit 5383e48

Please sign in to comment.