Skip to content

Commit

Permalink
Update sarama (kafka client) (elastic#7025)
Browse files Browse the repository at this point in the history
- update sarama to v1.16.0
- Use kafka 1.0.0 for testing
- add supported kafka versions up to 1.1.0
- Fix lz4 not being configurable
- Add unit test on config validation
- Separate kafka version definitions
  • Loading branch information
Steffen Siering authored and stevea78 committed May 20, 2018
1 parent 24e9362 commit 9bcee93
Show file tree
Hide file tree
Showing 98 changed files with 6,559 additions and 452 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Rename beat.cpu.*.time metrics to beat.cpu.*.time.ms. {pull}6449[6449]
- Mark `system.syslog.message` and `system.auth.message` as `text` instead of `keyword`. {pull}6589[6589]
- Allow override of dynamic template `match_mapping_type` for fields with object_type. {pull}6691[6691]
- Set default kafka version to 1.0.0 in kafka output. Older versions are still supported by configuring the `version` setting. {pull}7025[7025]

*Auditbeat*

Expand Down Expand Up @@ -56,6 +57,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Ensure Kubernetes labels/annotations don't break mapping {pull}6490[6490]
- Ensure that the dashboard zip files can't contain files outside of the kibana directory. {pull}6921[6921]
- Fix map overwrite panics by cloning shared structs before doing the update. {pull}6947[6947]
- Fix error if lz4 compression is used with the kafka output. {pull}7025[7025]

*Auditbeat*

Expand Down Expand Up @@ -123,6 +125,7 @@ https://github.com/elastic/beats/compare/v6.2.3...master[Check the HEAD diff]
- Add IP-addresses and MAC-addresses to add_host_metadata. {pull}6878[6878]
- Add a default seccomp (secure computing) filter on Linux that prohibits
execve, execveat, fork, and vfork syscalls. A custom policy can be configured. {issue}5213[5213]
- Update Sarama to v1.16.0, adding better support for kafka 0.11, 1.0, and 1.1 {pull}7025[7025]

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2012,8 +2012,8 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--------------------------------------------------------------------
Dependency: github.com/Shopify/sarama
Version: v1.12/enh/offset-replica-id
Revision: c292021939f5aba53b3ffc2cb09c7aadb32a42df
Version: v1.16.0/enh/offset-replica-id
Revision: 32b4ad5c9537ed14e471779b76713ff65420db39
License type (autodetected): MIT
./vendor/github.com/Shopify/sarama/LICENSE:
--------------------------------------------------------------------
Expand Down
113 changes: 109 additions & 4 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@ import (
"strings"
"time"

"github.com/Shopify/sarama"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/common/fmtstr"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
"github.com/elastic/beats/libbeat/monitoring/adapter"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/codec"
)
Expand Down Expand Up @@ -44,8 +49,17 @@ type metaRetryConfig struct {
Backoff time.Duration `config:"backoff" validate:"min=0"`
}

var (
defaultConfig = kafkaConfig{
var compressionModes = map[string]sarama.CompressionCodec{
"none": sarama.CompressionNone,
"no": sarama.CompressionNone,
"off": sarama.CompressionNone,
"gzip": sarama.CompressionGZIP,
"lz4": sarama.CompressionLZ4,
"snappy": sarama.CompressionSnappy,
}

func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
TLS: nil,
Timeout: 30 * time.Second,
Expand All @@ -62,14 +76,14 @@ var (
RequiredACKs: nil, // use library default
BrokerTimeout: 10 * time.Second,
Compression: "gzip",
Version: "",
Version: "1.0.0",
MaxRetries: 3,
ClientID: "beats",
ChanBufferSize: 256,
Username: "",
Password: "",
}
)
}

func (c *kafkaConfig) Validate() error {
if len(c.Hosts) == 0 {
Expand All @@ -90,3 +104,94 @@ func (c *kafkaConfig) Validate() error {

return nil
}

func newSaramaConfig(config *kafkaConfig) (*sarama.Config, error) {
partitioner, err := makePartitioner(config.Partition)
if err != nil {
return nil, err
}

k := sarama.NewConfig()

// configure network level properties
timeout := config.Timeout
k.Net.DialTimeout = timeout
k.Net.ReadTimeout = timeout
k.Net.WriteTimeout = timeout
k.Net.KeepAlive = config.KeepAlive
k.Producer.Timeout = config.BrokerTimeout

tls, err := outputs.LoadTLSConfig(config.TLS)
if err != nil {
return nil, err
}
if tls != nil {
k.Net.TLS.Enable = true
k.Net.TLS.Config = tls.BuildModuleConfig("")
}

if config.Username != "" {
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
k.Net.SASL.Password = config.Password
}

// configure metadata update properties
k.Metadata.Retry.Max = config.Metadata.Retry.Max
k.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
k.Metadata.RefreshFrequency = config.Metadata.RefreshFreq

// configure producer API properties
if config.MaxMessageBytes != nil {
k.Producer.MaxMessageBytes = *config.MaxMessageBytes
}
if config.RequiredACKs != nil {
k.Producer.RequiredAcks = sarama.RequiredAcks(*config.RequiredACKs)
}

compressionMode, ok := compressionModes[strings.ToLower(config.Compression)]
if !ok {
return nil, fmt.Errorf("Unknown compression mode: '%v'", config.Compression)
}
k.Producer.Compression = compressionMode

k.Producer.Return.Successes = true // enable return channel for signaling
k.Producer.Return.Errors = true

// have retries being handled by libbeat, disable retries in sarama library
retryMax := config.MaxRetries
if retryMax < 0 {
retryMax = 1000
}
k.Producer.Retry.Max = retryMax
// TODO: k.Producer.Retry.Backoff = ?

// configure per broker go channel buffering
k.ChannelBufferSize = config.ChanBufferSize

// configure client ID
k.ClientID = config.ClientID

version, ok := kafkaVersions[config.Version]
if !ok {
return nil, fmt.Errorf("Unknown/unsupported kafka version: %v", config.Version)
}
k.Version = version

k.MetricRegistry = kafkaMetricsRegistry()

k.Producer.Partitioner = partitioner
k.MetricRegistry = adapter.GetGoMetrics(
monitoring.Default,
"libbeat.outputs.kafka",
adapter.Rename("incoming-byte-rate", "bytes_read"),
adapter.Rename("outgoing-byte-rate", "bytes_write"),
adapter.GoMetricsNilify,
)

if err := k.Validate(); err != nil {
logp.Err("Invalid kafka configuration: %v", err)
return nil, err
}
return k, nil
}
41 changes: 41 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package kafka

import (
"testing"

"github.com/elastic/beats/libbeat/common"
)

func TestConfigAcceptValid(t *testing.T) {
tests := map[string]common.MapStr{
"default config is valid": common.MapStr{},
"lz4 with 0.11": common.MapStr{
"compression": "lz4",
"version": "0.11",
},
"lz4 with 1.0": common.MapStr{
"compression": "lz4",
"version": "1.0.0",
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
c, err := common.NewConfigFrom(test)
if err != nil {
t.Fatalf("Can not create test configuration: %v", err)
}
c.SetString("hosts", 0, "localhost")

cfg := defaultConfig()
if err := c.Unpack(&cfg); err != nil {
t.Fatalf("Unpacking configuration failed: %v", err)
}

if _, err := newSaramaConfig(&cfg); err != nil {
t.Fatalf("Failure creating sarama config: %v", err)
}
})
}
}
Loading

0 comments on commit 9bcee93

Please sign in to comment.