Skip to content

Commit

Permalink
Add Kerberos support to Kafka input and output (elastic#16871)
Browse files Browse the repository at this point in the history
## What does this PR do?

This PR adds initial Kerberos support to Kafka input and output. Until end to end tests are added or there are no bugs reported after the release, I am marking it as beta.

## Why is it important?

This lets Beats connect to Kerberos-aware Kafka instances.

Users have two options when authenticating to Kerberos - using a keytab file or providing a username and password pair.

#### Example authentication using keytab file

```yaml
# Authentication type to use with Kerberos. Available options: keytab, password.
kerberos.auth_type: keytab

# Path to the keytab file. It is used when auth_type is set to keytab.
kerberos.keytab: /etc/krb5kdc/kafka.keytab

# Path to the Kerberos configuration.
kerberos.config_path: /etc/path/config

# The service principal name.
kerberos.service_name: HTTP/kafka@ELASTIC

# Kerberos realm.
kerberos.realm: ELASTIC
```

#### Example authentication using username and password

```yaml
# Authentication type to use with Kerberos. Available options: keytab, password.
kerberos.auth_type: password

# Path to the Kerberos configuration.
kerberos.config_path: /etc/path/config

# The service principal name.
kerberos.service_name: HTTP/kafka@ELASTIC

# Name of the Kerberos user. It is used when auth_type is set to password.
kerberos.username: elastic

# Password of the Kerberos user. It is used when auth_type is set to password.
kerberos.password: changeme

# Kerberos realm.
kerberos.realm: ELASTIC
```
  • Loading branch information
kvch authored Mar 6, 2020
1 parent 8e8da46 commit 62f2971
Show file tree
Hide file tree
Showing 17 changed files with 408 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Windows .exe files now have embedded file version info. {issue}15232[15232]t
- Remove experimental flag from `setup.template.append_fields` {pull}16576[16576]
- Add `add_cloudfoundry_metadata` processor to annotate events with Cloud Foundry application data. {pull}16621[16621]
- Add Kerberos support to Kafka input and output. {pull}16781[16781]

*Auditbeat*

Expand Down
21 changes: 21 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,27 @@ output.elasticsearch:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

# Authentication type to use with Kerberos. Available options: keytab, password.
#kerberos.auth_type: password

# Path to the keytab file. It is used when auth_type is set to keytab.
#kerberos.keytab: /etc/krb5kdc/kafka.keytab

# Path to the Kerberos configuration.
#kerberos.config_path: /etc/path/config

# The service principal name.
#kerberos.service_name: HTTP/my-service@realm

# Name of the Kerberos user. It is used when auth_type is set to password.
#kerberos.username: elastic

# Password of the Kerberos user. It is used when auth_type is set to password.
#kerberos.password: changeme

# Kerberos realm.
#kerberos.realm: ELASTIC

#------------------------------- Redis output ----------------------------------
#output.redis:
# Boolean flag to enable or disable the output module.
Expand Down
21 changes: 21 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,27 @@ output.elasticsearch:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

# Authentication type to use with Kerberos. Available options: keytab, password.
#kerberos.auth_type: password

# Path to the keytab file. It is used when auth_type is set to keytab.
#kerberos.keytab: /etc/krb5kdc/kafka.keytab

# Path to the Kerberos configuration.
#kerberos.config_path: /etc/path/config

# The service principal name.
#kerberos.service_name: HTTP/my-service@realm

# Name of the Kerberos user. It is used when auth_type is set to password.
#kerberos.username: elastic

# Password of the Kerberos user. It is used when auth_type is set to password.
#kerberos.password: changeme

# Kerberos realm.
#kerberos.realm: ELASTIC

#------------------------------- Redis output ----------------------------------
#output.redis:
# Boolean flag to enable or disable the output module.
Expand Down
16 changes: 16 additions & 0 deletions filebeat/input/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (

"github.com/Shopify/sarama"

"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
Expand All @@ -47,6 +49,7 @@ type kafkaInputConfig struct {
Fetch kafkaFetch `config:"fetch"`
Rebalance kafkaRebalance `config:"rebalance"`
TLS *tlscommon.Config `config:"ssl"`
Kerberos *kerberos.Config `config:"kerberos"`
Username string `config:"username"`
Password string `config:"password"`
ExpandEventListFromField string `config:"expand_event_list_from_field"`
Expand Down Expand Up @@ -177,6 +180,19 @@ func newSaramaConfig(config kafkaInputConfig) (*sarama.Config, error) {
k.Net.TLS.Config = tls.BuildModuleConfig("")
}

if config.Kerberos != nil {
cfgwarn.Beta("Kerberos authentication for Kafka is beta.")
k.Net.SASL.GSSAPI = sarama.GSSAPIConfig{
AuthType: int(config.Kerberos.AuthType),
KeyTabPath: config.Kerberos.KeyTabPath,
KerberosConfigPath: config.Kerberos.ConfigPath,
ServiceName: config.Kerberos.ServiceName,
Username: config.Kerberos.Username,
Password: config.Kerberos.Password,
Realm: config.Kerberos.Realm,
}
}

if config.Username != "" {
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
Expand Down
21 changes: 21 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,27 @@ output.elasticsearch:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

# Authentication type to use with Kerberos. Available options: keytab, password.
#kerberos.auth_type: password

# Path to the keytab file. It is used when auth_type is set to keytab.
#kerberos.keytab: /etc/krb5kdc/kafka.keytab

# Path to the Kerberos configuration.
#kerberos.config_path: /etc/path/config

# The service principal name.
#kerberos.service_name: HTTP/my-service@realm

# Name of the Kerberos user. It is used when auth_type is set to password.
#kerberos.username: elastic

# Password of the Kerberos user. It is used when auth_type is set to password.
#kerberos.password: changeme

# Kerberos realm.
#kerberos.realm: ELASTIC

#------------------------------- Redis output ----------------------------------
#output.redis:
# Boolean flag to enable or disable the output module.
Expand Down
21 changes: 21 additions & 0 deletions journalbeat/journalbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,27 @@ output.elasticsearch:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

# Authentication type to use with Kerberos. Available options: keytab, password.
#kerberos.auth_type: password

# Path to the keytab file. It is used when auth_type is set to keytab.
#kerberos.keytab: /etc/krb5kdc/kafka.keytab

# Path to the Kerberos configuration.
#kerberos.config_path: /etc/path/config

# The service principal name.
#kerberos.service_name: HTTP/my-service@realm

# Name of the Kerberos user. It is used when auth_type is set to password.
#kerberos.username: elastic

# Password of the Kerberos user. It is used when auth_type is set to password.
#kerberos.password: changeme

# Kerberos realm.
#kerberos.realm: ELASTIC

#------------------------------- Redis output ----------------------------------
#output.redis:
# Boolean flag to enable or disable the output module.
Expand Down
21 changes: 21 additions & 0 deletions libbeat/_meta/config.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,27 @@ output.elasticsearch:
# Configure what types of renegotiation are supported. Valid options are
# never, once, and freely. Default is never.
#ssl.renegotiation: never

# Authentication type to use with Kerberos. Available options: keytab, password.
#kerberos.auth_type: password

# Path to the keytab file. It is used when auth_type is set to keytab.
#kerberos.keytab: /etc/krb5kdc/kafka.keytab

# Path to the Kerberos configuration.
#kerberos.config_path: /etc/path/config

# The service principal name.
#kerberos.service_name: HTTP/my-service@realm

# Name of the Kerberos user. It is used when auth_type is set to password.
#kerberos.username: elastic

# Password of the Kerberos user. It is used when auth_type is set to password.
#kerberos.password: changeme

# Kerberos realm.
#kerberos.realm: ELASTIC
{{end}}{{if not .ExcludeRedis}}
#------------------------------- Redis output ----------------------------------
#output.redis:
Expand Down
78 changes: 78 additions & 0 deletions libbeat/common/transport/kerberos/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package kerberos

import "fmt"

type AuthType uint

const (
AUTH_PASSWORD = 1
AUTH_KEYTAB = 2

authPassword = "password"
authKeytabStr = "keytab"
)

var (
authTypes = map[string]AuthType{
authPassword: AUTH_PASSWORD,
authKeytabStr: AUTH_KEYTAB,
}
)

type Config struct {
AuthType AuthType `config:"auth_type" validate:"required"`
KeyTabPath string `config:"keytab"`
ConfigPath string `config:"config_path"`
ServiceName string `config:"service_name"`
Username string `config:"username"`
Password string `config:"password"`
Realm string `config:"realm"`
}

// Unpack validates and unpack "auth_type" config option
func (t *AuthType) Unpack(value string) error {
authT, ok := authTypes[value]
if !ok {
return fmt.Errorf("invalid authentication type '%s'", value)
}

*t = authT

return nil
}

func (c *Config) Validate() error {
if c.AuthType == AUTH_PASSWORD {
if c.Username == "" {
return fmt.Errorf("password authentication is selected for Kerberos, but username is not configured")
}
if c.Password == "" {
return fmt.Errorf("password authentication is selected for Kerberos, but password is not configured")
}
}

if c.AuthType == AUTH_KEYTAB {
if c.KeyTabPath == "" {
return fmt.Errorf("keytab authentication is selected for Kerberos, but path to keytab is not configured")
}
}

return nil
}
17 changes: 17 additions & 0 deletions libbeat/outputs/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ import (
"github.com/Shopify/sarama"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
"github.com/elastic/beats/v7/libbeat/common/fmtstr"
"github.com/elastic/beats/v7/libbeat/common/kafka"
"github.com/elastic/beats/v7/libbeat/common/transport/kerberos"
"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
Expand All @@ -38,6 +40,7 @@ import (
type kafkaConfig struct {
Hosts []string `config:"hosts" validate:"required"`
TLS *tlscommon.Config `config:"ssl"`
Kerberos *kerberos.Config `config:"kerberos"`
Timeout time.Duration `config:"timeout" validate:"min=1"`
Metadata metaConfig `config:"metadata"`
Key *fmtstr.EventFormatString `config:"key"`
Expand Down Expand Up @@ -99,6 +102,7 @@ func defaultConfig() kafkaConfig {
return kafkaConfig{
Hosts: nil,
TLS: nil,
Kerberos: nil,
Timeout: 30 * time.Second,
BulkMaxSize: 2048,
BulkFlushFrequency: 0,
Expand Down Expand Up @@ -212,6 +216,19 @@ func newSaramaConfig(log *logp.Logger, config *kafkaConfig) (*sarama.Config, err
k.Net.TLS.Config = tls.BuildModuleConfig("")
}

if config.Kerberos != nil {
cfgwarn.Beta("Kerberos authentication for Kafka is beta.")
k.Net.SASL.GSSAPI = sarama.GSSAPIConfig{
AuthType: int(config.Kerberos.AuthType),
KeyTabPath: config.Kerberos.KeyTabPath,
KerberosConfigPath: config.Kerberos.ConfigPath,
ServiceName: config.Kerberos.ServiceName,
Username: config.Kerberos.Username,
Password: config.Kerberos.Password,
Realm: config.Kerberos.Realm,
}
}

if config.Username != "" {
k.Net.SASL.Enable = true
k.Net.SASL.User = config.Username
Expand Down
44 changes: 44 additions & 0 deletions libbeat/outputs/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ func TestConfigAcceptValid(t *testing.T) {
"compression": "lz4",
"version": "1.0.0",
},
"Kerberos with keytab": common.MapStr{
"kerberos": common.MapStr{
"auth_type": "keytab",
"keytab": "/etc/krb5kcd/kafka.keytab",
"config_path": "/etc/path/config",
"service_name": "HTTP/elastic@ELASTIC",
"realm": "ELASTIC",
},
},
"Kerberos with user and password pair": common.MapStr{
"kerberos": common.MapStr{
"auth_type": "password",
"username": "elastic",
"password": "changeme",
"config_path": "/etc/path/config",
"service_name": "HTTP/elastic@ELASTIC",
"realm": "ELASTIC",
},
},
}

for name, test := range tests {
Expand All @@ -52,3 +71,28 @@ func TestConfigAcceptValid(t *testing.T) {
})
}
}

func TestConfigInvalid(t *testing.T) {
tests := map[string]common.MapStr{
"Kerberos with invalid auth_type": common.MapStr{
"kerberos": common.MapStr{
"auth_type": "invalid_auth_type",
"config_path": "/etc/path/config",
"service_name": "HTTP/elastic@ELASTIC",
"realm": "ELASTIC",
},
},
}

for name, test := range tests {
test := test
t.Run(name, func(t *testing.T) {
c := common.MustNewConfigFrom(test)
c.SetString("hosts", 0, "localhost")
_, err := readConfig(c)
if err == nil {
t.Fatalf("Can create test configuration from invalid input")
}
})
}
}
21 changes: 21 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1478,6 +1478,27 @@ output.elasticsearch:
# never, once, and freely. Default is never.
#ssl.renegotiation: never

# Authentication type to use with Kerberos. Available options: keytab, password.
#kerberos.auth_type: password

# Path to the keytab file. It is used when auth_type is set to keytab.
#kerberos.keytab: /etc/krb5kdc/kafka.keytab

# Path to the Kerberos configuration.
#kerberos.config_path: /etc/path/config

# The service principal name.
#kerberos.service_name: HTTP/my-service@realm

# Name of the Kerberos user. It is used when auth_type is set to password.
#kerberos.username: elastic

# Password of the Kerberos user. It is used when auth_type is set to password.
#kerberos.password: changeme

# Kerberos realm.
#kerberos.realm: ELASTIC

#------------------------------- Redis output ----------------------------------
#output.redis:
# Boolean flag to enable or disable the output module.
Expand Down
Loading

0 comments on commit 62f2971

Please sign in to comment.