diff --git a/Gopkg.lock b/Gopkg.lock index 39856a6f36d1..1ca09c857b8c 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -42,15 +42,14 @@ revision = "de5bf2ad457846296e2031421a34e2568e304e35" [[projects]] - digest = "1:7e31a67d6e81ae7bac48b27c9260ad164eff0abdb4300d0f2aa8d4856cc45479" + digest = "1:42831312efd0280d7ea16fc3e85819127852416547e6b35370fa0845455c56eb" name = "github.com/Shopify/sarama" packages = [ ".", "mocks", ] pruneopts = "UT" - revision = "ea9ab1c316850bee881a07bb2555ee8a685cd4b6" - version = "v1.22.1" + revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9" [[projects]] digest = "1:8515c0ca4381246cf332cee05fc84070bbbb07bd679b639161506ba532f47128" @@ -482,6 +481,14 @@ revision = "52e1c4730856c1438ced7597c9b5c585a7bd06a2" version = "v1.0.0" +[[projects]] + digest = "1:f14364057165381ea296e49f8870a9ffce2b8a95e34d6ae06c759106aaef428c" + name = "github.com/hashicorp/go-uuid" + packages = ["."] + pruneopts = "UT" + revision = "4f571afc59f3043a65f8fe6bf46d887b10a01d43" + version = "v1.0.1" + [[projects]] digest = "1:c0d19ab64b32ce9fe5cf4ddceba78d5bc9807f0016db6b1183599da3dcc24d10" name = "github.com/hashicorp/hcl" @@ -517,6 +524,17 @@ revision = "76626ae9c91c4f2a10f34cad8ce83ea42c93bb75" version = "v1.0" +[[projects]] + branch = "master" + digest = "1:ae221758bdddd57f5c76f4ee5e4110af32ee62583c46299094697f8f127e63da" + name = "github.com/jcmturner/gofork" + packages = [ + "encoding/asn1", + "x/crypto/pbkdf2", + ] + pruneopts = "UT" + revision = "dc7c13fece037a4a36e2b3c69db4991498d30692" + [[projects]] digest = "1:15ec2166e33ef6c60b344a04d050eec79193517e7f5082b6233b2d09ef0d10b8" name = "github.com/kisielk/gotool" @@ -982,6 +1000,17 @@ revision = "27376062155ad36be76b0f12cf1572a221d3a48c" version = "v1.10.0" +[[projects]] + branch = "master" + digest = "1:04b43fe96213ea69cfa6e6b8be218a43a375035ea09d9bdda9fed2691f5a7e76" + name = "golang.org/x/crypto" + packages = [ + "md4", + "pbkdf2", + ] + pruneopts = "UT" + revision = "f99c8df09eb5bff426315721bfa5f16a99cad32c" + [[projects]] branch = "master" digest = "1:6bf120070ed448fd0a139da7b9514006b3390bd815a0013c50cc672c24a74fa1" @@ -1126,6 +1155,72 @@ revision = "d2d2541c53f18d2a059457998ce2876cc8e67cbf" version = "v0.9.1" +[[projects]] + digest = "1:c902038ee2d6f964d3b9f2c718126571410c5d81251cbab9fe58abd37803513c" + name = "gopkg.in/jcmturner/aescts.v1" + packages = ["."] + pruneopts = "UT" + revision = "f6abebb3171c4c1b1fea279cb7c7325020a26290" + version = "v1.0.1" + +[[projects]] + digest = "1:a1a3e185c03d79a7452d5d5b4c91be4cc433f55e6ed3a35233d852c966e39013" + name = "gopkg.in/jcmturner/dnsutils.v1" + packages = ["."] + pruneopts = "UT" + revision = "13eeb8d49ffb74d7a75784c35e4d900607a3943c" + version = "v1.0.1" + +[[projects]] + digest = "1:462bc6dbe06e0f5d060b651bcefe0b4a2433799be6758285b888e9ef188c6411" + name = "gopkg.in/jcmturner/gokrb5.v7" + packages = [ + "asn1tools", + "client", + "config", + "credentials", + "crypto", + "crypto/common", + "crypto/etype", + "crypto/rfc3961", + "crypto/rfc3962", + "crypto/rfc4757", + "crypto/rfc8009", + "gssapi", + "iana", + "iana/addrtype", + "iana/adtype", + "iana/asnAppTag", + "iana/chksumtype", + "iana/errorcode", + "iana/etypeID", + "iana/flags", + "iana/keyusage", + "iana/msgtype", + "iana/nametype", + "iana/patype", + "kadmin", + "keytab", + "krberror", + "messages", + "pac", + "types", + ] + pruneopts = "UT" + revision = "bae8ea1f6fab91f6bcb830efe54eb697c8350050" + version = "v7.2.4" + +[[projects]] + digest = "1:0f16d9c577198e3b8d3209f5a89aabe679525b2aba2a7548714e973035c0e232" + name = "gopkg.in/jcmturner/rpc.v1" + packages = [ + "mstypes", + "ndr", + ] + pruneopts = "UT" + revision = "99a8ce2fbf8b8087b6ed12a37c61b10f04070043" + version = "v1.1.0" + [[projects]] digest = "1:9a1d716749c77399bfa71792d77eef3278586423947f3431dbac6d6049c24787" name = "gopkg.in/olivere/elastic.v5" diff --git a/Gopkg.toml b/Gopkg.toml index 8641413990b4..5f920fdd00ef 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -150,7 +150,7 @@ required = [ [[constraint]] name = "github.com/Shopify/sarama" - version = "1.20.1" + revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9" [[constraint]] name = "github.com/grpc-ecosystem/go-grpc-middleware" @@ -171,3 +171,7 @@ required = [ [[constraint]] name = "github.com/hashicorp/go-hclog" version = "0.8.0" + +[[override]] + name = "github.com/Shopify/sarama" + revision = "cd910a683f9faa57222e5120d17b60d2e65f7fa9" \ No newline at end of file diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index 7fb88dbf2278..279823fe9437 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -51,10 +51,11 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit spanProcessor := processor.NewSpanProcessor(spParams) consumerConfig := kafkaConsumer.Configuration{ - Brokers: options.Brokers, - Topic: options.Topic, - GroupID: options.GroupID, - ClientID: options.ClientID, + Brokers: options.Brokers, + Topic: options.Topic, + GroupID: options.GroupID, + ClientID: options.ClientID, + AuthenticationConfig: options.AuthenticationConfig, } saramaConsumer, err := consumerConfig.NewConsumer() if err != nil { diff --git a/cmd/ingester/app/flags.go b/cmd/ingester/app/flags.go index 39e98e387dae..9eba6832865a 100644 --- a/cmd/ingester/app/flags.go +++ b/cmd/ingester/app/flags.go @@ -23,6 +23,7 @@ import ( "github.com/spf13/viper" + "github.com/jaegertracing/jaeger/pkg/kafka/auth" kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer" "github.com/jaegertracing/jaeger/plugin/storage/kafka" ) @@ -48,7 +49,6 @@ const ( SuffixParallelism = ".parallelism" // SuffixHTTPPort is a suffix for the HTTP port SuffixHTTPPort = ".http-port" - // DefaultBroker is the default kafka broker DefaultBroker = "127.0.0.1:9092" // DefaultTopic is the default kafka topic @@ -103,6 +103,8 @@ func AddFlags(flagSet *flag.FlagSet) { ConfigPrefix+SuffixDeadlockInterval, DefaultDeadlockInterval, "Interval to check for deadlocks. If no messages gets processed in given time, ingester app will exit. Value of 0 disables deadlock check.") + // Authentication flags + auth.AddFlags(KafkaConsumerConfigPrefix, flagSet) } // InitFromViper initializes Builder with properties from viper @@ -115,6 +117,9 @@ func (o *Options) InitFromViper(v *viper.Viper) { o.Parallelism = v.GetInt(ConfigPrefix + SuffixParallelism) o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval) + authenticationOptions := auth.AuthenticationConfig{} + authenticationOptions.InitFromViper(KafkaConsumerConfigPrefix, v) + o.AuthenticationConfig = authenticationOptions } // stripWhiteSpace removes all whitespace characters from a string diff --git a/pkg/kafka/auth/.nocover b/pkg/kafka/auth/.nocover new file mode 100644 index 000000000000..98344a6f8ba2 --- /dev/null +++ b/pkg/kafka/auth/.nocover @@ -0,0 +1 @@ +requires connection to Kafka diff --git a/pkg/kafka/auth/config.go b/pkg/kafka/auth/config.go new file mode 100644 index 000000000000..8640f6ff2c84 --- /dev/null +++ b/pkg/kafka/auth/config.go @@ -0,0 +1,65 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auth + +import ( + "log" + "strings" + + "github.com/Shopify/sarama" + "github.com/spf13/viper" +) + +const none = "none" +const kerberos = "kerberos" + +var authTypes = []string { + none, + kerberos, +} + +// AuthenticationConfig describes the configuration properties needed authenticate with kafka cluster +type AuthenticationConfig struct { + Authentication string + Kerberos KerberosConfig +} + +//SetConfiguration set configure authentication into sarama config structure +func (config *AuthenticationConfig) SetConfiguration(saramaConfig *sarama.Config) { + authentication := strings.ToLower(config.Authentication) + if strings.Trim(authentication, " ") == "" { + authentication = none + } + switch authentication { + case kerberos: + setKerberosConfiguration(&config.Kerberos, saramaConfig) + case none: + return + default: + log.Fatalf("Unknown/Unsupported authentication method %s to kafka cluster.", config.Authentication) + } +} + +// InitFromViper loads authentication configuration from viper flags. +func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.Viper) { + config.Authentication = v.GetString(configPrefix + suffixAuthentication) + config.Kerberos.ServiceName = v.GetString(configPrefix + kerberosPrefix + suffixKerberosServiceName) + config.Kerberos.Realm = v.GetString(configPrefix + kerberosPrefix + suffixKerberosRealm) + config.Kerberos.UseKeyTab = v.GetBool(configPrefix + kerberosPrefix + suffixKerberosUseKeyTab) + config.Kerberos.Username = v.GetString(configPrefix + kerberosPrefix + suffixKerberosUserName) + config.Kerberos.Password = v.GetString(configPrefix + kerberosPrefix + suffixKerberosPassword) + config.Kerberos.ConfigPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosConfig) + config.Kerberos.KeyTabPath = v.GetString(configPrefix + kerberosPrefix + suffixKerberosKeyTab) +} diff --git a/pkg/kafka/auth/kerberos.go b/pkg/kafka/auth/kerberos.go new file mode 100644 index 000000000000..5418a3f00bbb --- /dev/null +++ b/pkg/kafka/auth/kerberos.go @@ -0,0 +1,46 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auth + +import ( + "github.com/Shopify/sarama" +) + +// KerberosConfig describes the configuration properties needed for Kerberos authentication with kafka consumer +type KerberosConfig struct { + ServiceName string + Realm string + UseKeyTab bool + Username string + Password string + ConfigPath string + KeyTabPath string +} + +func setKerberosConfiguration(config *KerberosConfig, saramaConfig *sarama.Config) { + saramaConfig.Net.SASL.Mechanism = sarama.SASLTypeGSSAPI + saramaConfig.Net.SASL.Enable = true + if config.UseKeyTab { + saramaConfig.Net.SASL.GSSAPI.KeyTabPath = config.KeyTabPath + saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_KEYTAB_AUTH + } else { + saramaConfig.Net.SASL.GSSAPI.AuthType = sarama.KRB5_USER_AUTH + saramaConfig.Net.SASL.GSSAPI.Password = config.Password + } + saramaConfig.Net.SASL.GSSAPI.KerberosConfigPath = config.ConfigPath + saramaConfig.Net.SASL.GSSAPI.Username = config.Username + saramaConfig.Net.SASL.GSSAPI.Realm = config.Realm + saramaConfig.Net.SASL.GSSAPI.ServiceName = config.ServiceName +} diff --git a/pkg/kafka/auth/options.go b/pkg/kafka/auth/options.go new file mode 100644 index 000000000000..b68025bc202e --- /dev/null +++ b/pkg/kafka/auth/options.go @@ -0,0 +1,84 @@ +// Copyright (c) 2019 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package auth + +import ( + "flag" + "strings" +) + +const ( + suffixAuthentication = ".authentication" + + // Kerberos configuration options + kerberosPrefix = ".kerberos" + suffixKerberosServiceName = ".service-name" + suffixKerberosRealm = ".realm" + suffixKerberosUseKeyTab = ".use-keytab" + suffixKerberosUserName = ".username" + suffixKerberosPassword = ".password" + suffixKerberosConfig = ".config-file" + suffixKerberosKeyTab = ".keytab-file" + + defaultAuthentication = none + defaultKerberosConfig = "/etc/krb5.conf" + defaultKerberosUseKeyTab = false + defaultKerberosServiceName = "kafka" + defaultKerberosRealm = "" + defaultKerberosPassword = "" + defaultKerberosUsername = "" + defaultKerberosKeyTab = "/etc/security/kafka.keytab" +) + +func addKerberosFlags(configPrefix string, flagSet *flag.FlagSet) { + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosServiceName, + defaultKerberosServiceName, + "Kerberos service name") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosRealm, + defaultKerberosRealm, + "Kerberos realm") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosPassword, + defaultKerberosPassword, + "The Kerberos password used for authenticate with KDC") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosUserName, + defaultKerberosUsername, + "The Kerberos username used for authenticate with KDC") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosConfig, + defaultKerberosConfig, + "Path to Kerberos configuration. i.e /etc/krb5.conf") + flagSet.Bool( + configPrefix+kerberosPrefix+suffixKerberosUseKeyTab, + defaultKerberosUseKeyTab, + "Use of keytab instead of password, if this is true, keytab file will be used instead of password") + flagSet.String( + configPrefix+kerberosPrefix+suffixKerberosKeyTab, + defaultKerberosKeyTab, + "Path to keytab file. i.e /etc/security/kafka.keytab") +} + +// AddFlags add configuration flags to a flagSet. +func AddFlags(configPrefix string, flagSet *flag.FlagSet) { + flagSet.String( + configPrefix+suffixAuthentication, + defaultAuthentication, + "Authentication type used to authenticate with kafka cluster. e.g. " + strings.Join(authTypes,", "), + ) + addKerberosFlags(configPrefix, flagSet) +} diff --git a/pkg/kafka/consumer/config.go b/pkg/kafka/consumer/config.go index 094914e1b6e2..509800b0bdb8 100644 --- a/pkg/kafka/consumer/config.go +++ b/pkg/kafka/consumer/config.go @@ -18,6 +18,8 @@ import ( "io" "github.com/bsm/sarama-cluster" + + "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) // Consumer is an interface to features of Sarama that are necessary for the consumer @@ -39,6 +41,7 @@ type Configuration struct { GroupID string ClientID string Consumer + auth.AuthenticationConfig } // NewConsumer creates a new kafka consumer @@ -46,5 +49,6 @@ func (c *Configuration) NewConsumer() (Consumer, error) { saramaConfig := cluster.NewConfig() saramaConfig.Group.Mode = cluster.ConsumerModePartitions saramaConfig.ClientID = c.ClientID + c.AuthenticationConfig.SetConfiguration(&saramaConfig.Config) return cluster.NewConsumer(c.Brokers, c.GroupID, []string{c.Topic}, saramaConfig) } diff --git a/pkg/kafka/producer/config.go b/pkg/kafka/producer/config.go index 5079ca8fef22..c866c4d957f0 100644 --- a/pkg/kafka/producer/config.go +++ b/pkg/kafka/producer/config.go @@ -16,6 +16,8 @@ package producer import ( "github.com/Shopify/sarama" + + "github.com/jaegertracing/jaeger/pkg/kafka/auth" ) // Builder builds a new kafka producer @@ -26,11 +28,13 @@ type Builder interface { // Configuration describes the configuration properties needed to create a Kafka producer type Configuration struct { Brokers []string + auth.AuthenticationConfig } // NewProducer creates a new asynchronous kafka producer func (c *Configuration) NewProducer() (sarama.AsyncProducer, error) { saramaConfig := sarama.NewConfig() saramaConfig.Producer.Return.Successes = true + c.AuthenticationConfig.SetConfiguration(saramaConfig) return sarama.NewAsyncProducer(c.Brokers, saramaConfig) } diff --git a/plugin/storage/kafka/options.go b/plugin/storage/kafka/options.go index 4ecb18ce1435..136449d6e14b 100644 --- a/plugin/storage/kafka/options.go +++ b/plugin/storage/kafka/options.go @@ -21,6 +21,7 @@ import ( "github.com/spf13/viper" + "github.com/jaegertracing/jaeger/pkg/kafka/auth" "github.com/jaegertracing/jaeger/pkg/kafka/producer" ) @@ -32,11 +33,10 @@ const ( // EncodingZipkinThrift is used for spans encoded as Zipkin Thrift. EncodingZipkinThrift = "zipkin-thrift" - configPrefix = "kafka.producer" - suffixBrokers = ".brokers" - suffixTopic = ".topic" - suffixEncoding = ".encoding" - + configPrefix = "kafka.producer" + suffixBrokers = ".brokers" + suffixTopic = ".topic" + suffixEncoding = ".encoding" defaultBroker = "127.0.0.1:9092" defaultTopic = "jaeger-spans" defaultEncoding = EncodingProto @@ -69,12 +69,16 @@ func (opt *Options) AddFlags(flagSet *flag.FlagSet) { defaultEncoding, fmt.Sprintf(`(experimental) Encoding of spans ("%s" or "%s") sent to kafka.`, EncodingJSON, EncodingProto), ) + auth.AddFlags(configPrefix, flagSet) } // InitFromViper initializes Options with properties from viper func (opt *Options) InitFromViper(v *viper.Viper) { + authenticationOptions := auth.AuthenticationConfig{} + authenticationOptions.InitFromViper(configPrefix, v) opt.config = producer.Configuration{ - Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), + Brokers: strings.Split(stripWhiteSpace(v.GetString(configPrefix+suffixBrokers)), ","), + AuthenticationConfig: authenticationOptions, } opt.topic = v.GetString(configPrefix + suffixTopic) opt.encoding = v.GetString(configPrefix + suffixEncoding)