Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix async publisher sending empty events #2455

Merged
merged 1 commit into from
Sep 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ https://github.com/elastic/beats/compare/v5.0.0-alpha5...master[Check the HEAD d

*Filebeat*
- Fix processor failure in Filebeat when using regex, contain, or equals with the message field. {issue}2178[2178]
- Fix async publisher sending empty events {pull}2455[2455]

*Winlogbeat*
- Fix corrupt registry file that occurs on power loss by disabling file write caching. {issue}2313[2313]
Expand Down
6 changes: 6 additions & 0 deletions filebeat/input/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ func (f *Event) ToMapStr() common.MapStr {
return event
}

// HasData returns true if the event itself contains data
// Events without data are only state updates
func (e *Event) HasData() bool {
return e.Bytes > 0
}

// mergeJSONFields writes the JSON fields in the event map,
// respecting the KeysUnderRoot and OverwriteKeys configuration options.
// If MessageKey is defined, the Text value from the event always
Expand Down
12 changes: 8 additions & 4 deletions filebeat/publish/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (p *syncLogPublisher) Start() {
pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
// Only send event with bytes read. 0 Bytes means state update only
if event.Bytes > 0 {
if event.HasData() {
pubEvents = append(pubEvents, event.ToMapStr())
}
}
Expand Down Expand Up @@ -172,9 +172,12 @@ func (p *asyncLogPublisher) Start() {
case <-p.done:
return
case events := <-p.in:
pubEvents := make([]common.MapStr, len(events))
for i, event := range events {
pubEvents[i] = event.ToMapStr()

pubEvents := make([]common.MapStr, 0, len(events))
for _, event := range events {
if event.HasData() {
pubEvents = append(pubEvents, event.ToMapStr())
}
}

batch := &eventsBatch{
Expand All @@ -185,6 +188,7 @@ func (p *asyncLogPublisher) Start() {
publisher.Signal(batch), publisher.Guaranteed)

p.active.append(batch)

case <-ticker.C:
}

Expand Down
1 change: 1 addition & 0 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ filebeat.idle_timeout: 0.1s
{% if not skip_registry_config %}
filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}}
{%endif%}
filebeat.publish_async: {{publish_async}}


#================================ General =====================================
Expand Down
60 changes: 60 additions & 0 deletions filebeat/tests/system/test_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from filebeat import BaseTest

import os
import platform
import time
import shutil
import json
from nose.plugins.skip import Skip, SkipTest


# Additional tests: to be implemented
# * Check if registrar file can be configured, set config param
# * Check "updating" of registrar file
# * Check what happens when registrar file is deleted


class Test(BaseTest):

def test_registrar_file_content(self):
"""
Check if registrar file is created correctly and content is as expected
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
publish_async=True
)
os.mkdir(self.working_dir + "/log/")

filebeat = self.start_beat()

testfile = self.working_dir + "/log/test.log"
file = open(testfile, 'w')

iterations = 5
for n in range(0, iterations):
file.write("line " + str(n+1))
file.write("\n")

file.close()

# Let it read the file
self.wait_until(
lambda: self.output_has(lines=iterations), max_timeout=10)

self.wait_until(
lambda: self.output_has(lines=iterations), max_timeout=10)

# Wait until registry file is written
self.wait_until(
lambda: self.log_contains(
"Registry file updated."),
max_timeout=15)

filebeat.check_kill_and_wait()

data = self.get_registry()
assert len(data) == 1
assert self.output_has(lines=iterations)