Skip to content

Commit

Permalink
Add kafka kerberos authentication support for collector/ingester (#1589)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruben Vargas <[email protected]>
  • Loading branch information
rubenvp8510 authored and pavolloffay committed Jun 27, 2019
1 parent 3588002 commit 5b58ebc
Show file tree
Hide file tree
Showing 11 changed files with 328 additions and 15 deletions.
101 changes: 98 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ required = [

[[constraint]]
name = "github.com/Shopify/sarama"
version = "1.20.1"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"

[[constraint]]
name = "github.com/grpc-ecosystem/go-grpc-middleware"
Expand All @@ -171,3 +171,7 @@ required = [
[[constraint]]
name = "github.com/hashicorp/go-hclog"
version = "0.8.0"

[[override]]
name = "github.com/Shopify/sarama"
revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9"
9 changes: 5 additions & 4 deletions cmd/ingester/app/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit
spanProcessor := processor.NewSpanProcessor(spParams)

consumerConfig := kafkaConsumer.Configuration{
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
Brokers: options.Brokers,
Topic: options.Topic,
GroupID: options.GroupID,
ClientID: options.ClientID,
AuthenticationConfig: options.AuthenticationConfig,
}
saramaConsumer, err := consumerConfig.NewConsumer()
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/spf13/viper"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)
Expand All @@ -48,7 +49,6 @@ const (
SuffixParallelism = ".parallelism"
// SuffixHTTPPort is a suffix for the HTTP port
SuffixHTTPPort = ".http-port"

// DefaultBroker is the default kafka broker
DefaultBroker = "127.0.0.1:9092"
// DefaultTopic is the default kafka topic
Expand Down Expand Up @@ -103,6 +103,8 @@ func AddFlags(flagSet *flag.FlagSet) {
ConfigPrefix+SuffixDeadlockInterval,
DefaultDeadlockInterval,
"Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.")
// Authentication flags
auth.AddFlags(KafkaConsumerConfigPrefix, flagSet)
}

// InitFromViper initializes Builder with properties from viper
Expand All @@ -115,6 +117,9 @@ func (o *Options) InitFromViper(v *viper.Viper) {

o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism)
o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)
authenticationOptions := auth.AuthenticationConfig{}
authenticationOptions.InitFromViper(KafkaConsumerConfigPrefix, v)
o.AuthenticationConfig = authenticationOptions
}

// stripWhiteSpace removes all whitespace characters from a string
Expand Down
1 change: 1 addition & 0 deletions pkg/kafka/auth/.nocover
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
requires connection to Kafka
65 changes: 65 additions & 0 deletions pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"log"
"strings"

"github.com/Shopify/sarama"
"github.com/spf13/viper"
)

const none = "none"
const kerberos = "kerberos"

var authTypes = []string{
none,
kerberos,
}

// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster
type AuthenticationConfig struct {
Authentication string
Kerberos KerberosConfig
}

//SetConfiguration set configure authentication into sarama config structure
func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) {
authentication := strings.ToLower(config.Authentication)
if strings.Trim(authentication, " ") == "" {
authentication = none
}
switch authentication {
case kerberos:
setKerberosConfiguration(&config.Kerberos, saramaConfig)
case none:
return
default:
log.Fatalf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication)
}
}

// InitFromViper loads authentication configuration from viper flags.
func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.Viper) {
config.Authentication = v.GetString(configPrefix + suffixAuthentication)
config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName)
config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm)
config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab)
config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName)
config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword)
config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig)
config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab)
}
46 changes: 46 additions & 0 deletions pkg/kafka/auth/kerberos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package auth

import (
"github.com/Shopify/sarama"
)

// KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer
type KerberosConfig struct {
ServiceName string
Realm string
UseKeyTab bool
Username string
Password string
ConfigPath string
KeyTabPath string
}

func setKerberosConfiguration(config *KerberosConfig, saramaConfig *sarama.Config) {
saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI
saramaConfig.Net.SASL.Enable = true
if config.UseKeyTab {
saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH
} else {
saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH
saramaConfig.Net.SASL.GSSAPI.Password = config.Password
}
saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath
saramaConfig.Net.SASL.GSSAPI.Username = config.Username
saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm
saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName
}
Loading

0 comments on commit 5b58ebc

Please sign in to comment.