Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry-pick #7281 to 6.3: Keep different registry entry per container stream #7300

Merged
merged 2 commits into from
Jun 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ https://github.com/elastic/beats/compare/v6.2.3...v6.3.0[View commits]
- Commit registry writes to stable storage to avoid corrupt registry files. {pull}6877[6877]
- Fix a parsing issue in the syslog input for RFC3339 timestamp and time with nanoseconds. {pull}7046[7046]
- Fix an issue with an overflowing wait group when using the TCP input. {issue}7202[7202]
- Keep different registry entry per container stream to avoid wrong offsets. {issue}7281[7281]

*Heartbeat*

Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func NewInput(
if err := cfg.SetString("docker-json", -1, config.Containers.Stream); err != nil {
return nil, errors.Wrap(err, "update input config")
}

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
context.Meta["stream"] = config.Containers.Stream
}

return log.NewInput(cfg, outletFactory, context)
}

Expand Down
40 changes: 29 additions & 11 deletions filebeat/input/file/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,30 @@ package file

import (
"os"
"strconv"
"time"

"github.com/mitchellh/hashstructure"

"github.com/elastic/beats/libbeat/common/file"
)

// State is used to communicate the reading state of a file
type State struct {
Id string `json:"-"` // local unique id to make comparison more efficient
Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Id string `json:"-"` // local unique id to make comparison more efficient

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

struct field Id should be ID

Finished bool `json:"-"` // harvester state
Fileinfo os.FileInfo `json:"-"` // the file info
Source string `json:"source"`
Offset int64 `json:"offset"`
Timestamp time.Time `json:"timestamp"`
TTL time.Duration `json:"ttl"`
Type string `json:"type"`
Meta map[string]string `json:"meta"`
FileStateOS file.StateOS
}

// NewState creates a new file state
func NewState(fileInfo os.FileInfo, path string, t string) State {
func NewState(fileInfo os.FileInfo, path string, t string, meta map[string]string) State {
return State{
Fileinfo: fileInfo,
Source: path,
Expand All @@ -30,15 +34,26 @@ func NewState(fileInfo os.FileInfo, path string, t string) State {
Timestamp: time.Now(),
TTL: -1, // By default, state does have an infinite ttl
Type: t,
Meta: meta,
}
}

// ID returns a unique id for the state as a string
func (s *State) ID() string {
// Generate id on first request. This is needed as id is not set when converting back from json
if s.Id == "" {
s.Id = s.FileStateOS.String()
if s.Meta == nil {
s.Id = s.FileStateOS.String()
} else {
hashValue, _ := hashstructure.Hash(s.Meta, nil)
var hashBuf [17]byte
hash := strconv.AppendUint(hashBuf[:0], hashValue, 16)
hash = append(hash, '-')

s.Id = string(hash) + s.FileStateOS.String()
}
}

return s.Id
}

Expand All @@ -49,5 +64,8 @@ func (s *State) IsEqual(c *State) bool {

// IsEmpty returns true if the state is empty
func (s *State) IsEmpty() bool {
return *s == State{}
return s.FileStateOS == file.StateOS{} &&
s.Source == "" &&
s.Meta == nil &&
s.Timestamp.IsZero()
}
1 change: 1 addition & 0 deletions filebeat/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func New(
Done: input.done,
BeatDone: input.beatDone,
DynamicFields: dynFields,
Meta: map[string]string{},
}
var ipt Input
ipt, err = f(conf, outlet, context)
Expand Down
23 changes: 20 additions & 3 deletions filebeat/input/log/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Input struct {
stateOutlet channel.Outleter
done chan struct{}
numHarvesters atomic.Uint32
meta map[string]string
}

// NewInput instantiates a new Log
Expand Down Expand Up @@ -80,6 +81,7 @@ func NewInput(
stateOutlet: stateOut,
states: file.NewStates(),
done: context.Done,
meta: context.Meta,
}

if err := cfg.Unpack(&p.config); err != nil {
Expand Down Expand Up @@ -121,7 +123,7 @@ func (p *Input) loadStates(states []file.State) error {

for _, state := range states {
// Check if state source belongs to this input. If yes, update the state.
if p.matchesFile(state.Source) {
if p.matchesFile(state.Source) && p.matchesMeta(state.Meta) {
state.TTL = -1

// In case a input is tried to be started with an unfinished state matching the glob pattern
Expand Down Expand Up @@ -183,7 +185,7 @@ func (p *Input) Run() {
}
} else {
// Check if existing source on disk and state are the same. Remove if not the case.
newState := file.NewState(stat, state.Source, p.config.Type)
newState := file.NewState(stat, state.Source, p.config.Type, p.meta)
if !newState.FileStateOS.IsSame(state.FileStateOS) {
p.removeState(state)
logp.Debug("input", "Remove state for file as file removed or renamed: %s", state.Source)
Expand Down Expand Up @@ -297,6 +299,21 @@ func (p *Input) matchesFile(filePath string) bool {
return false
}

// matchesMeta returns true in case the given meta is equal to the one of this input, false if not
func (p *Input) matchesMeta(meta map[string]string) bool {
if len(meta) != len(p.meta) {
return false
}

for k, v := range p.meta {
if meta[k] != v {
return false
}
}

return true
}

type FileSortInfo struct {
info os.FileInfo
path string
Expand Down Expand Up @@ -361,7 +378,7 @@ func getFileState(path string, info os.FileInfo, p *Input) (file.State, error) {
}
logp.Debug("input", "Check file for harvesting: %s", absolutePath)
// Create new state for comparison
newState := file.NewState(info, absolutePath, p.config.Type)
newState := file.NewState(info, absolutePath, p.config.Type, p.meta)
return newState, nil
}

Expand Down
55 changes: 55 additions & 0 deletions filebeat/input/log/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,61 @@ func TestIsCleanInactive(t *testing.T) {
}
}

func TestMatchesMeta(t *testing.T) {
tests := []struct {
Input *Input
Meta map[string]string
Result bool
}{
{
Input: &Input{
meta: map[string]string{
"it": "matches",
},
},
Meta: map[string]string{
"it": "matches",
},
Result: true,
},
{
Input: &Input{
meta: map[string]string{
"it": "doesnt",
"doesnt": "match",
},
},
Meta: map[string]string{
"it": "doesnt",
},
Result: false,
},
{
Input: &Input{
meta: map[string]string{
"it": "doesnt",
},
},
Meta: map[string]string{
"it": "doesnt",
"doesnt": "match",
},
Result: false,
},
{
Input: &Input{
meta: map[string]string{},
},
Meta: map[string]string{},
Result: true,
},
}

for _, test := range tests {
assert.Equal(t, test.Result, test.Input.matchesMeta(test.Meta))
}
}

type TestFileInfo struct {
time time.Time
}
Expand Down
1 change: 1 addition & 0 deletions filebeat/input/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type Context struct {
Done chan struct{}
BeatDone chan struct{}
DynamicFields *common.MapStrPointer
Meta map[string]string
}

type Factory = func(config *common.Config, outletFactory channel.Factory, context Context) (Input, error)
Expand Down
55 changes: 55 additions & 0 deletions filebeat/tests/system/test_registrar.py
Original file line number Diff line number Diff line change
Expand Up @@ -1506,3 +1506,58 @@ def test_registrar_files_with_input_level_processors(self):
"inode": stat.st_ino,
"device": stat.st_dev,
}, file_state_os)

def test_registrar_meta(self):
"""
Check that multiple entries for the same file are on the registry when they have
different meta
"""

self.render_config_template(
type='docker',
input_raw='''
containers:
path: {path}
stream: stdout
ids:
- container_id
- type: docker
containers:
path: {path}
stream: stderr
ids:
- container_id
'''.format(path=os.path.abspath(self.working_dir) + "/log/")
)
os.mkdir(self.working_dir + "/log/")
os.mkdir(self.working_dir + "/log/container_id")
testfile_path1 = self.working_dir + "/log/container_id/test.log"

with open(testfile_path1, 'w') as f:
for i in range(0, 10):
f.write('{"log":"hello\\n","stream":"stdout","time":"2018-04-13T13:39:57.924216596Z"}\n')
f.write('{"log":"hello\\n","stream":"stderr","time":"2018-04-13T13:39:57.924216596Z"}\n')

filebeat = self.start_beat()

self.wait_until(
lambda: self.output_has(lines=20),
max_timeout=15)

# wait until the registry file exist. Needed to avoid a race between
# the logging and actual writing the file. Seems to happen on Windows.

self.wait_until(
lambda: os.path.isfile(os.path.join(self.working_dir,
"registry")),
max_timeout=1)

filebeat.check_kill_and_wait()

# Check registry contains 2 entries with meta
data = self.get_registry()
assert len(data) == 2
assert data[0]["source"] == data[1]["source"]
assert data[0]["meta"]["stream"] in ("stdout", "stderr")
assert data[1]["meta"]["stream"] in ("stdout", "stderr")
assert data[0]["meta"]["stream"] != data[1]["meta"]["stream"]
2 changes: 1 addition & 1 deletion filebeat/util/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func (d *Data) GetState() file.State {

// HasState returns true if the data object contains state data
func (d *Data) HasState() bool {
return d.state != file.State{}
return !d.state.IsEmpty()
}

// GetEvent returns the event in the data object
Expand Down