Skip to content

Commit

Permalink
Add kafka kerberos authentication support for collector/ingester
Browse files Browse the repository at this point in the history
Signed-off-by: Ruben Vargas <[email protected]>
  • Loading branch information
rubenvp8510 committed Jun 6, 2019
1 parent 09dad38 commit 53e2731
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 18 deletions.
101 changes: 98 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ required = [

[[constraint]]
name = "github.com/Shopify/sarama"
version = "1.20.1"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"

[[constraint]]
name = "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -171,3 +171,7 @@ required = [
[[constraint]]
name = "github.com/hashicorp/go-hclog"
version = "0.8.0"

[[override]]
name = "github.com/Shopify/sarama"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
65 changes: 65 additions & 0 deletions cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,24 @@ const (
SuffixParallelism = ".parallelism"
// SuffixHTTPPort is a suffix for the HTTP port
SuffixHTTPPort = ".http-port"
// SuffixAuthentication for enable or disable authentication when connect to kafka cluster.
SuffixAuthentication = ".authentication"
// KerberosPrefix for Kerberos configuration options
KerberosPrefix = ".kerberos"
// SuffixKerberosServiceName is the suffix for Kerberos service name
SuffixKerberosServiceName = ".serviceName"
// SuffixKerberosRealm is the suffix for Kerberos realm name
SuffixKerberosRealm = ".realm"
// SuffixKerberosUseKeyTab is the suffix determine if kerberos should use keytab file or password
SuffixKerberosUseKeyTab = ".useKeytab"
// SuffixKerberosUserName is Kerberos username
SuffixKerberosUserName = ".username"
// SuffixKerberosPassword is Kerberos password
SuffixKerberosPassword = ".password"
// SuffixKerberosConfig is path to the kerberos configuration file.
SuffixKerberosConfig = ".kerberosConfig"
// SuffixKerberosKeyTab is path keytab file used instead of password when SuffixKerberosUseKeyTab = true
SuffixKerberosKeyTab = ".keytab"

// DefaultBroker is the default kafka broker
DefaultBroker = "127.0.0.1:9092"
Expand All @@ -63,6 +81,12 @@ const (
DefaultEncoding = kafka.EncodingProto
// DefaultDeadlockInterval is the default deadlock interval
DefaultDeadlockInterval = 1 * time.Minute
// DefaultAuthentication is the default value for enable/disable authentication
DefaultAuthentication = false
// DefaultKerberosConfig is the default kerberos configuration path
DefaultKerberosConfig = "/etc/krb5.conf"
// DefaultKerberosUseKeyTab is the default use of keytab file
DefaultKerberosUseKeyTab = false
)

// Options stores the configuration options for the Ingester
Expand Down Expand Up @@ -103,6 +127,39 @@ func AddFlags(flagSet *flag.FlagSet) {
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
flagSet.Bool(
KafkaConsumerConfigPrefix+SuffixAuthentication,
DefaultAuthentication,
"Enable or disable authentication to kafka cluster")
// Kerberos
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosServiceName,
strconv.Itoa(DefaultParallelism),
"Kerberos service name")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosRealm,
strconv.Itoa(DefaultParallelism),
"Kerberos realm")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosPassword,
strconv.Itoa(DefaultParallelism),
"The Kerberos password used for authenticate, when "+KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUseKeyTab+"=false.")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUserName,
strconv.Itoa(DefaultParallelism),
"The Kerberos username used for authenticate with KDC")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosConfig,
DefaultKerberosConfig,
"Path to Kerberos configuration. i.e /etc/krb5.conf")
flagSet.Bool(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosUseKeyTab,
DefaultKerberosUseKeyTab,
"Use of keytab instead of password, if this is true, keytab file will be used instead of password")
flagSet.String(
KafkaConsumerConfigPrefix+KerberosPrefix+SuffixKerberosKeyTab,
strconv.Itoa(DefaultParallelism),
"Path to keytab file. i.e /etc/security/kafka.keytab")
}

// InitFromViper initializes Builder with properties from viper
Expand All @@ -115,6 +172,14 @@ func (o *Options) InitFromViper(v *viper.Viper) {

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)

o.Authentication = v.GetBool(KafkaConsumerConfigPrefix + SuffixAuthentication)
o.ServiceName = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosServiceName)
o.Realm = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosRealm)
o.KeyTabPath = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosKeyTab)
o.Password = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosPassword)
o.Username = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosUserName)
o.KerberosConfigPath = v.GetString(ConfigPrefix + KerberosPrefix + SuffixKerberosConfig)
}

// stripWhiteSpace removes all whitespace characters from a string
Expand Down
37 changes: 33 additions & 4 deletions pkg/kafka/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package consumer
import (
"io"

"github.com/Shopify/sarama"
"github.com/bsm/sarama-cluster"
)

Expand All @@ -32,19 +33,47 @@ type Builder interface {
NewConsumer() (Consumer, error)
}

// KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer
type KerberosConfig struct {
ServiceName string
Realm string
UseKeyTab bool
Username string
Password string
KerberosConfigPath string
KeyTabPath string
}

// Configuration describes the configuration properties needed to create a Kafka consumer
type Configuration struct {
Brokers []string
Topic string
GroupID string
ClientID string
Brokers []string
Topic string
GroupID string
ClientID string
Authentication bool
Consumer
KerberosConfig
}

// NewConsumer creates a new kafka consumer
func (c *Configuration) NewConsumer() (Consumer, error) {
saramaConfig := cluster.NewConfig()
saramaConfig.Group.Mode = cluster.ConsumerModePartitions
saramaConfig.ClientID = c.ClientID
if c.Authentication {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
if c.UseKeyTab {
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = c.KeyTabPath
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
} else {
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = c.Password
}
saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = c.KerberosConfigPath
saramaConfig.Net.SASL.GSSAPI.Username = c.Username
saramaConfig.Net.SASL.GSSAPI.Realm = c.Realm
saramaConfig.Net.SASL.GSSAPI.ServiceName = c.ServiceName
}

return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig)
}
31 changes: 30 additions & 1 deletion pkg/kafka/producer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,43 @@ type Builder interface {
NewProducer() (sarama.AsyncProducer, error)
}

// KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer
type KerberosConfig struct {
ServiceName string
Realm string
UseKeyTab bool
Username string
Password string
KerberosConfigPath string
KeyTabPath string
}

// Configuration describes the configuration properties needed to create a Kafka producer
type Configuration struct {
Brokers []string
Brokers []string
Authentication bool
KerberosConfig
}

// NewProducer creates a new asynchronous kafka producer
func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
saramaConfig.Net.SASL.Enable = c.Authentication
// This is ok for now, we only support kerberos.
if c.Authentication {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
if c.UseKeyTab {
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = c.KeyTabPath
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
} else {
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = c.Password
}
saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = c.KerberosConfigPath
saramaConfig.Net.SASL.GSSAPI.Username = c.Username
saramaConfig.Net.SASL.GSSAPI.Realm = c.Realm
saramaConfig.Net.SASL.GSSAPI.ServiceName = c.ServiceName
}
return sarama.NewAsyncProducer(c.Brokers, saramaConfig)
}
Loading

0 comments on commit 53e2731

Please sign in to comment.