Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update libbeat publisher pipeline #4492

Merged
merged 3 commits into from
Jun 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -3042,7 +3042,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

--------------------------------------------------------------------
Dependency: github.com/urso/go-structform
Revision: 12cbde40ef8e75dd5ead25c50333262463a89574
Revision: fc6abfdbae53e185870094bc210b42a0f65f6176
License type (autodetected): Apache License 2.0
./vendor/github.com/urso/go-structform/LICENSE:
--------------------------------------------------------------------
Expand Down
7 changes: 3 additions & 4 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
// loadModulesPipelines is called when modules are configured to do the initial
// setup.
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Ingest Node pipelines for the configured" +
" modules because the Elasticsearch output is not configured/enabled. If you have" +
" already loaded the Ingest Node pipelines or are using Logstash pipelines, you" +
Expand All @@ -120,13 +119,13 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
func (fb *Filebeat) loadModulesML(b *beat.Beat) error {
logp.Debug("machine-learning", "Setting up ML jobs for modules")

esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn("Filebeat is unable to load the Xpack Machine Learning configurations for the" +
" modules because the Elasticsearch output is not configured/enabled.")
return nil
}

esConfig := b.Config.Output.Config()
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return errors.Errorf("Error creating Elasticsearch client: %v", err)
Expand Down
2 changes: 1 addition & 1 deletion filebeat/publisher/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
)

type asyncLogPublisher struct {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion filebeat/publisher/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (

"github.com/elastic/beats/filebeat/util"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
)

type syncLogPublisher struct {
Expand Down
7 changes: 4 additions & 3 deletions filebeat/tests/system/test_shutdown.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_shutdown(self):

def test_shutdown_wait_ok(self):
"""
Test stopping filebeat under load and wait for publisher queue to be emptied.
Test stopping filebeat under load: wait for all events being published.
"""

self.nasa_logs()
Expand Down Expand Up @@ -63,9 +63,10 @@ def test_shutdown_wait_ok(self):
assert len(registry) == 1
assert registry[0]["offset"] == output["offset"]

@unittest.skip("Skipping unreliable test")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have had to skip the test, as it has been very unreliable and I didn't manage to reproduce it yet for investigation. Plus, this feature as is will be replaced with wait shutdown support directly provided by the new publisher pipeline.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you perhaps open a follow up github issue to track these things?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the only test I have had to skip. But will create meta-ticket for further pipeline work.

def test_shutdown_wait_timeout(self):
"""
Test stopping filebeat under load and wait for publisher queue to be emptied.
Test stopping filebeat under load: allow early shutdown.
"""

self.nasa_logs()
Expand All @@ -80,7 +81,7 @@ def test_shutdown_wait_timeout(self):

# Wait until it tries the first time to publish
self.wait_until(
lambda: self.log_contains("ERR Connecting error publishing events"),
lambda: self.log_contains("ERR Failed to connect"),
max_timeout=15)

filebeat.check_kill_and_wait()
Expand Down
2 changes: 1 addition & 1 deletion generator/beat/{beat}/beater/{beat}.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"

"{beat_path}/config"
)
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"

"github.com/elastic/beats/heartbeat/config"
"github.com/elastic/beats/heartbeat/monitors"
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"

"github.com/elastic/beats/heartbeat/monitors"
"github.com/elastic/beats/heartbeat/scheduler"
Expand Down
34 changes: 18 additions & 16 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,14 @@ import (
"github.com/elastic/beats/libbeat/paths"
"github.com/elastic/beats/libbeat/plugin"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher"
"github.com/elastic/beats/libbeat/publisher/bc/publisher"
svc "github.com/elastic/beats/libbeat/service"
"github.com/elastic/beats/libbeat/template"
"github.com/elastic/beats/libbeat/version"

// Register publisher pipeline modules
_ "github.com/elastic/beats/libbeat/publisher/includes"

// Register default processors.
_ "github.com/elastic/beats/libbeat/processors/actions"
_ "github.com/elastic/beats/libbeat/processors/add_cloud_metadata"
Expand Down Expand Up @@ -115,15 +118,15 @@ type Beat struct {

// BeatConfig struct contains the basic configuration of every beat
type BeatConfig struct {
Shipper publisher.ShipperConfig `config:",inline"`
Output map[string]*common.Config `config:"output"`
Monitoring *common.Config `config:"xpack.monitoring"`
Logging logp.Logging `config:"logging"`
Processors processors.PluginConfig `config:"processors"`
Path paths.Path `config:"path"`
Dashboards *common.Config `config:"setup.dashboards"`
Template *common.Config `config:"setup.template"`
Http *common.Config `config:"http"`
Shipper publisher.ShipperConfig `config:",inline"`
Output common.ConfigNamespace `config:"output"`
Monitoring *common.Config `config:"xpack.monitoring"`
Logging logp.Logging `config:"logging"`
Processors processors.PluginConfig `config:"processors"`
Path paths.Path `config:"path"`
Dashboards *common.Config `config:"setup.dashboards"`
Template *common.Config `config:"setup.template"`
Http *common.Config `config:"http"`
}

var (
Expand Down Expand Up @@ -352,11 +355,11 @@ func (b *Beat) Setup(bt Creator, template, dashboards, machineLearning bool) err
}

if template {
esConfig := b.Config.Output["elasticsearch"]
if esConfig == nil || !esConfig.Enabled() {
if b.Config.Output.Name() != "elasticsearch" {
return fmt.Errorf("Template loading requested but the Elasticsearch output is not configured/enabled")
}

esConfig := b.Config.Output.Config()
if b.Config.Template == nil || (b.Config.Template != nil && b.Config.Template.Enabled()) {
loadCallback, err := b.templateLoadingCallback()
if err != nil {
Expand Down Expand Up @@ -560,8 +563,8 @@ func (b *Beat) loadDashboards(force bool) error {
}
}

if b.Config.Dashboards != nil && b.Config.Dashboards.Enabled() {
esConfig := b.Config.Output["elasticsearch"]
if b.Config.Dashboards != nil && b.Config.Dashboards.Enabled() && b.Config.Output.Name() == "elasticsearch" {
esConfig := b.Config.Output.Config()
if esConfig == nil || !esConfig.Enabled() {
return fmt.Errorf("Dashboard loading requested but the Elasticsearch output is not configured/enabled")
}
Expand Down Expand Up @@ -608,9 +611,8 @@ func (b *Beat) registerTemplateLoading() error {
}
}

esConfig := b.Config.Output["elasticsearch"]
// Loads template by default if esOutput is enabled
if esConfig != nil && esConfig.Enabled() {
if b.Config.Output.Name() == "elasticsearch" {
if b.Config.Template == nil || (b.Config.Template != nil && b.Config.Template.Enabled()) {
// load template through callback to make sure it is also loaded
// on reconnecting
Expand Down
13 changes: 12 additions & 1 deletion libbeat/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,8 +337,15 @@ func (ns *ConfigNamespace) Unpack(cfg *Config) error {
return nil
}

var (
err error
found bool
)

for _, name := range fields {
sub, err := cfg.Child(name, -1)
var sub *Config

sub, err = cfg.Child(name, -1)
if err != nil {
// element is no configuration object -> continue so a namespace
// Config unpacked as a namespace can have other configuration
Expand All @@ -356,8 +363,12 @@ func (ns *ConfigNamespace) Unpack(cfg *Config) error {

ns.name = name
ns.config = sub
found = true
}

if !found {
return err
}
return nil
}

Expand Down
29 changes: 10 additions & 19 deletions libbeat/common/fmtstr/formatevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/dtfmt"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/publisher/beat"
)

// EventFormatString implements format string support on events
// of type common.MapStr.
// of type beat.Event.
//
// The concrete event expansion requires the field name enclosed by brackets.
// For example: '%{[field.name]}'. Field names can be separated by points or
Expand Down Expand Up @@ -170,7 +171,7 @@ func (fs *EventFormatString) Fields() []string {

// Run executes the format string returning a new expanded string or an error
// if execution or event field expansion fails.
func (fs *EventFormatString) Run(event common.MapStr) (string, error) {
func (fs *EventFormatString) Run(event *beat.Event) (string, error) {
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

Expand All @@ -192,7 +193,7 @@ func (fs *EventFormatString) Run(event common.MapStr) (string, error) {

// RunBytes executes the format string returning a new expanded string of type
// `[]byte` or an error if execution or event field expansion fails.
func (fs *EventFormatString) RunBytes(event common.MapStr) ([]byte, error) {
func (fs *EventFormatString) RunBytes(event *beat.Event) ([]byte, error) {
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

Expand All @@ -208,7 +209,7 @@ func (fs *EventFormatString) RunBytes(event common.MapStr) ([]byte, error) {
}

// Eval executes the format string, writing the resulting string into the provided output buffer. Returns error if execution or event field expansion fails.
func (fs *EventFormatString) Eval(out *bytes.Buffer, event common.MapStr) error {
func (fs *EventFormatString) Eval(out *bytes.Buffer, event *beat.Event) error {
ctx := newEventCtx(len(fs.fields))
defer releaseCtx(ctx)

Expand All @@ -227,7 +228,7 @@ func (fs *EventFormatString) IsConst() bool {
// of strings.
func (fs *EventFormatString) collectFields(
ctx *eventEvalContext,
event common.MapStr,
event *beat.Event,
) error {
for _, fi := range fs.fields {
s, err := fieldString(event, fi.path)
Expand All @@ -242,19 +243,7 @@ func (fs *EventFormatString) collectFields(
}

if fs.timestamp {
timestamp, found := event["@timestamp"]
if !found {
return errors.New("missing timestamp")
}

switch t := timestamp.(type) {
case common.Time:
ctx.ts = time.Time(t)
case time.Time:
ctx.ts = t
default:
return errors.New("unknown timestamp type")
}
ctx.ts = event.Timestamp
}

return nil
Expand Down Expand Up @@ -398,7 +387,7 @@ func parseEventPath(field string) (string, error) {
}

// TODO: move to libbeat/common?
func fieldString(event common.MapStr, field string) (string, error) {
func fieldString(event *beat.Event, field string) (string, error) {
v, err := event.GetValue(field)
if err != nil {
return "", err
Expand All @@ -422,6 +411,8 @@ func tryConvString(v interface{}) (string, error) {
return s, nil
case common.Time:
return s.String(), nil
case time.Time:
return common.Time(s).String(), nil
case []byte:
return string(s), nil
case stringer:
Expand Down
Loading