diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index b6a6012fc60..cd475e7bfca 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -14,6 +14,8 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di *Affecting all Beats* +- Determine log level for kafka output. {pull}5397[5397] + *Auditbeat* *Filebeat* diff --git a/libbeat/outputs/kafka/log.go b/libbeat/outputs/kafka/log.go index 7613032bd5e..673ee956eac 100644 --- a/libbeat/outputs/kafka/log.go +++ b/libbeat/outputs/kafka/log.go @@ -1,17 +1,38 @@ package kafka -import "github.com/elastic/beats/libbeat/logp" +import ( + "github.com/Shopify/sarama" + + "github.com/elastic/beats/libbeat/logp" +) type kafkaLogger struct{} -func (kafkaLogger) Print(v ...interface{}) { - logp.Warn("kafka message: %v", v...) +func (kl kafkaLogger) Print(v ...interface{}) { + kl.Log("kafka message: %v", v) +} + +func (kl kafkaLogger) Printf(format string, v ...interface{}) { + kl.Log(format, v) } -func (kafkaLogger) Printf(format string, v ...interface{}) { - logp.Warn(format, v...) +func (kl kafkaLogger) Println(v ...interface{}) { + kl.Log("kafka message: %v", v...) } -func (kafkaLogger) Println(v ...interface{}) { - logp.Warn("kafka message: %v", v...) +func (kafkaLogger) Log(format string, v ...interface{}) { + warn := false + for _, val := range v { + if err, ok := val.(sarama.KError); ok { + if err != sarama.ErrNoError { + warn = true + break + } + } + } + if warn { + logp.Warn(format, v) + } else { + logp.Info(format, v) + } }