Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Enable Kafka TLS when TLS auth is specified #2107

Merged
merged 7 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const (

var tlsFlagsConfig = tlscfg.ClientFlagsConfig{
Prefix: gRPCPrefix,
ShowEnabled: true,
Enabled: tlscfg.Show,
ShowServerName: true,
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const (

var tlsFlagsConfig = tlscfg.ServerFlagsConfig{
Prefix: "collector.grpc",
ShowEnabled: true,
ShowEnabled: tlscfg.Show,
ShowClientCA: true,
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/ingester/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/kafka/auth"
kafkaConsumer "github.com/jaegertracing/jaeger/pkg/kafka/consumer"
Expand Down Expand Up @@ -114,7 +115,7 @@ func AddFlags(flagSet *flag.FlagSet) {
}

// InitFromViper initializes Builder with properties from viper
func (o *Options) InitFromViper(v *viper.Viper) {
func (o *Options) InitFromViper(v *viper.Viper, logger *zap.Logger) {
o.Brokers = strings.Split(stripWhiteSpace(v.GetString(KafkaConsumerConfigPrefix+SuffixBrokers)), ",")
o.Topic = v.GetString(KafkaConsumerConfigPrefix + SuffixTopic)
o.GroupID = v.GetString(KafkaConsumerConfigPrefix + SuffixGroupID)
Expand All @@ -126,6 +127,7 @@ func (o *Options) InitFromViper(v *viper.Viper) {
o.DeadlockInterval = v.GetDuration(ConfigPrefix + SuffixDeadlockInterval)
authenticationOptions := auth.AuthenticationConfig{}
authenticationOptions.InitFromViper(KafkaConsumerConfigPrefix, v)
authenticationOptions.Normalize(logger)
o.AuthenticationConfig = authenticationOptions
}

Expand Down
48 changes: 46 additions & 2 deletions cmd/ingester/app/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
package app

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/kafka/auth"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
)

Expand All @@ -37,7 +42,7 @@ func TestOptionsWithFlags(t *testing.T) {
"--ingester.parallelism=5",
"--ingester.deadlockInterval=2m",
})
o.InitFromViper(v)
o.InitFromViper(v, zap.NewNop())

assert.Equal(t, "topic1", o.Topic)
assert.Equal(t, []string{"127.0.0.1:9092", "0.0.0:1234"}, o.Brokers)
Expand All @@ -49,11 +54,50 @@ func TestOptionsWithFlags(t *testing.T) {
assert.Equal(t, kafka.EncodingJSON, o.Encoding)
}

func TestTLSFlags(t *testing.T) {
tests := []struct {
flags []string
expected auth.AuthenticationConfig
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}},
pavolloffay marked this conversation as resolved.
Show resolved Hide resolved
},
{
flags: []string{"--kafka.consumer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}},
},
{
flags: []string{"--kafka.consumer.authentication=kerberos", "--kafka.consumer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}, TLS: tlscfg.Options{Enabled: true}},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is Authentication: "tls" here?

},
{
flags: []string{"--kafka.consumer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}, TLS: tlscfg.Options{Enabled: true}},
},
{
flags: []string{"--kafka.consumer.authentication=tls", "--kafka.consumer.tls.enabled=false"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that if someone explicitly set tls.enabled to false, this will be silently changed to true`. I have nothing against that, but perhaps there's a way to detect whether this value was explicitly provided and keep whatever the user set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be done by changing the enabled property to a pointer. I am not sure if there is much value in it. Some signatures will have to be also changed to accept the logger.

More controversial is this setting

// --kafka.consumer.authentication=kerberos",
--kafka.consumer.tls.enabled=true"

Somebody omitting the auth type and specifying tls.enabled=true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yurishkuro any thoughts on this? I don't have a strong opinion whether kafka.consumer.tls.enabled=true should set kafka.consumer.authentication=tls.

Copy link
Contributor

@jpkrohling jpkrohling Mar 2, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Gitter, we just saw a case where a user wants TLS encryption between the agent and collector without the auth parts. kafka.producer.tls.enabled=true + kafka.producer.authentication=none is a valid combination.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it's confusing for people to have both kafka.producer.tls.enabled and kafka.producer.authentication they will probably tend to forget the second one.

I have updated the PR to set tls auth when .tls.enabled is true.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the PR to set tls auth when .tls.enabled is true.

It's the opposite. tls.enabled does not imply authentication=tls. It's perfectly possible to have authentication=none but the traffic be encrypted using TLS.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpkrohling I am not sure if I understand you...

It's perfectly possible to have authentication=none but the traffic be encrypted using TLS.

This is exactly what the PR does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I misunderstood you, you are absolutely correct :-)

expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}, TLS: tlscfg.Options{Enabled: true}},
},
}

for _, test := range tests {
t.Run(fmt.Sprintf("%s", test.flags), func(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
err := command.ParseFlags(test.flags)
require.NoError(t, err)
o.InitFromViper(v, zap.NewNop())
assert.Equal(t, test.expected, o.AuthenticationConfig)
})
}
}

func TestFlagDefaults(t *testing.T) {
o := &Options{}
v, command := config.Viperize(AddFlags)
command.ParseFlags([]string{})
o.InitFromViper(v)
o.InitFromViper(v, zap.NewNop())

assert.Equal(t, DefaultTopic, o.Topic)
assert.Equal(t, []string{DefaultBroker}, o.Brokers)
Expand Down
2 changes: 1 addition & 1 deletion cmd/ingester/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func main() {
}

options := app.Options{}
options.InitFromViper(v)
options.InitFromViper(v, logger)
consumer, err := builder.CreateConsumer(logger, metricsFactory, spanWriter, options)
if err != nil {
logger.Fatal("Unable to create consumer", zap.Error(err))
Expand Down
32 changes: 24 additions & 8 deletions pkg/config/tlscfg/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,36 @@ const (
tlsSkipHostVerify = tlsPrefix + ".skip-host-verify"
)

type Enabled int

const (
Hide Enabled = iota
Show
ShowDeprecated
)

// ClientFlagsConfig describes which CLI flags for TLS client should be generated.
type ClientFlagsConfig struct {
Prefix string
ShowEnabled bool
Enabled Enabled
ShowServerName bool
}

// ServerFlagsConfig describes which CLI flags for TLS server should be generated.
type ServerFlagsConfig struct {
Prefix string
ShowEnabled bool
ShowEnabled Enabled
ShowClientCA bool
}

// AddFlags adds flags for TLS to the FlagSet.
func (c ClientFlagsConfig) AddFlags(flags *flag.FlagSet) {
if c.ShowEnabled {
flags.Bool(c.Prefix+tlsEnabled, false, "Enable TLS when talking to the remote server(s)")
if c.Enabled >= Show {
deprecated := ""
if c.Enabled == ShowDeprecated {
deprecated = "(deprecated) "
}
flags.Bool(c.Prefix+tlsEnabled, false, deprecated+"Enable TLS when talking to the remote server(s)")
flags.Bool(c.Prefix+tlsEnabledOld, false, "(deprecated) see --"+c.Prefix+tlsEnabled)
}
flags.String(c.Prefix+tlsCA, "", "Path to a TLS CA (Certification Authority) file used to verify the remote server(s) (by default will use the system truststore)")
Expand All @@ -64,8 +76,12 @@ func (c ClientFlagsConfig) AddFlags(flags *flag.FlagSet) {

// AddFlags adds flags for TLS to the FlagSet.
func (c ServerFlagsConfig) AddFlags(flags *flag.FlagSet) {
if c.ShowEnabled {
flags.Bool(c.Prefix+tlsEnabled, false, "Enable TLS on the server")
if c.ShowEnabled >= Show {
deprecated := ""
if c.ShowEnabled == ShowDeprecated {
deprecated = "(deprecated) "
}
flags.Bool(c.Prefix+tlsEnabled, false, deprecated+"Enable TLS on the server")
flags.Bool(c.Prefix+tlsEnabledOld, false, "(deprecated) see --"+c.Prefix+tlsEnabled)
}
flags.String(c.Prefix+tlsCert, "", "Path to a TLS Certificate file, used to identify this server to clients")
Expand All @@ -77,7 +93,7 @@ func (c ServerFlagsConfig) AddFlags(flags *flag.FlagSet) {
// InitFromViper creates tls.Config populated with values retrieved from Viper.
func (c ClientFlagsConfig) InitFromViper(v *viper.Viper) Options {
var p Options
if c.ShowEnabled {
if c.Enabled >= Show {
p.Enabled = v.GetBool(c.Prefix + tlsEnabled)

if !p.Enabled {
Expand All @@ -97,7 +113,7 @@ func (c ClientFlagsConfig) InitFromViper(v *viper.Viper) Options {
// InitFromViper creates tls.Config populated with values retrieved from Viper.
func (c ServerFlagsConfig) InitFromViper(v *viper.Viper) Options {
var p Options
if c.ShowEnabled {
if c.ShowEnabled >= Show {
p.Enabled = v.GetBool(c.Prefix + tlsEnabled)

if !p.Enabled {
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/tlscfg/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestClientFlags(t *testing.T) {
flagSet := &flag.FlagSet{}
flagCfg := ClientFlagsConfig{
Prefix: "prefix",
ShowEnabled: true,
Enabled: Show,
ShowServerName: true,
}
flagCfg.AddFlags(flagSet)
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestServerFlags(t *testing.T) {
flagSet := &flag.FlagSet{}
flagCfg := ServerFlagsConfig{
Prefix: "prefix",
ShowEnabled: true,
ShowEnabled: Show,
ShowClientCA: true,
}
flagCfg.AddFlags(flagSet)
Expand Down
14 changes: 13 additions & 1 deletion pkg/kafka/auth/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/Shopify/sarama"
"github.com/spf13/viper"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
)
Expand Down Expand Up @@ -80,7 +81,7 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.

var tlsClientConfig = tlscfg.ClientFlagsConfig{
Prefix: configPrefix,
ShowEnabled: true,
Enabled: tlscfg.ShowDeprecated,
ShowServerName: true,
}

Expand All @@ -89,3 +90,14 @@ func (config *AuthenticationConfig) InitFromViper(configPrefix string, v *viper.
config.PlainText.UserName = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextUserName)
config.PlainText.Password = v.GetString(configPrefix + plainTextPrefix + suffixPlainTextPassword)
}

// Normalize normalizes kafka options
func (config *AuthenticationConfig) Normalize(logger *zap.Logger) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to add to split InitFromViper and Normalize. InitFromViper is called via storage factory and it does not have access to the logger when doing the call.

if config.TLS.Enabled == true {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in L83, can we change ShowEnabled to false?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will break jaeger-operator and clients which adopted 1.17

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, but do we really want to keep this duality? We can extend the config to render tls.enabled as deprecated, and add warnings when used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try to do that, I don't like the current design either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought @jpkrohling had mentioned a required use case where TLS could be enabled, but authentication set to none or something other than tls?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created an issue to remove them in the next release #2111

Copy link
Member Author

@pavolloffay pavolloffay Mar 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@objectiser yes, the current flags are misleading for people because there is overlap betwen:

--kafka.consumer.authentication
--kafka.consumer.tls.enabled

The goal is to remove the second one.

logger.Warn("Flag .tls.enabled is deprecated use " + suffixAuthentication + " instead.")
config.Authentication = tls
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct. Other modes can be compatible with TLS according to @jpkrohling, so just having tls.enabled should not change the auth scheme.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed this and made change to pkg/kafka/auth/config.go to load TLS when tls.enabled=true

}
if config.Authentication == tls {
config.TLS.Enabled = true
}
}
2 changes: 1 addition & 1 deletion pkg/kafka/auth/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func AddFlags(configPrefix string, flagSet *flag.FlagSet) {

tlsClientConfig := tlscfg.ClientFlagsConfig{
Prefix: configPrefix,
ShowEnabled: true,
Enabled: tlscfg.ShowDeprecated,
ShowServerName: true,
}
tlsClientConfig.AddFlags(flagSet)
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (opt *Options) InitFromViper(v *viper.Viper) {
func tlsFlagsConfig(namespace string) tlscfg.ClientFlagsConfig {
return tlscfg.ClientFlagsConfig{
Prefix: namespace,
ShowEnabled: true,
Enabled: tlscfg.Show,
ShowServerName: true,
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/es/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func NewOptions(primaryNamespace string, otherNamespaces ...string) *Options {
func (config *namespaceConfig) getTLSFlagsConfig() tlscfg.ClientFlagsConfig {
return tlscfg.ClientFlagsConfig{
Prefix: config.namespace,
ShowEnabled: true,
Enabled: tlscfg.Show,
ShowServerName: true,
}
}
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (s *KafkaIntegrationTestSuite) initialize() error {
return err
}
options := app.Options{}
options.InitFromViper(v)
options.InitFromViper(v, s.logger)
traceStore := memory.NewStore()
spanConsumer, err := builder.CreateConsumer(s.logger, metrics.NullFactory, traceStore, options)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package kafka
import (
"errors"
"flag"

"github.com/Shopify/sarama"
"github.com/spf13/viper"
"github.com/uber/jaeger-lib/metrics"
Expand Down Expand Up @@ -62,6 +61,7 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
logger.Info("Kafka factory",
zap.Any("producer builder", f.Builder),
zap.Any("topic", f.options.topic))
f.options.config.Normalize(logger)
p, err := f.NewProducer()
if err != nil {
return err
Expand Down
45 changes: 45 additions & 0 deletions plugin/storage/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
package kafka

import (
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/kafka/auth"
)

func TestOptionsWithFlags(t *testing.T) {
Expand Down Expand Up @@ -164,3 +168,44 @@ func TestRequiredAcksFailures(t *testing.T) {
_, err := getRequiredAcks("test")
assert.Error(t, err)
}

func TestTLSFlags(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are duplicated. Don't we have Kafka flags parsed by a packaged shared between ingester and storage?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both tests are in different packages, there is no shared test class.

tests := []struct {
flags []string
expected auth.AuthenticationConfig
}{
{
flags: []string{},
expected: auth.AuthenticationConfig{Authentication: "none", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}},
},
{
flags: []string{"--kafka.producer.authentication=foo"},
expected: auth.AuthenticationConfig{Authentication: "foo", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}},
},
{
flags: []string{"--kafka.producer.authentication=kerberos", "--kafka.producer.tls.enabled=true"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}, TLS: tlscfg.Options{Enabled: true}},
},
{
flags: []string{"--kafka.producer.authentication=tls"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}, TLS: tlscfg.Options{Enabled: true}},
},
{
flags: []string{"--kafka.producer.authentication=tls", "--kafka.producer.tls.enabled=false"},
expected: auth.AuthenticationConfig{Authentication: "tls", Kerberos: auth.KerberosConfig{ServiceName: "kafka", ConfigPath: "/etc/krb5.conf", KeyTabPath: "/etc/security/kafka.keytab"}, TLS: tlscfg.Options{Enabled: true}},
},
}

for _, test := range tests {
t.Run(fmt.Sprintf("%s", test.flags), func(t *testing.T) {
o := &Options{}
v, command := config.Viperize(o.AddFlags)
err := command.ParseFlags(test.flags)
require.NoError(t, err)
o.InitFromViper(v)
o.config.Normalize(zap.NewNop())
assert.Equal(t, test.expected, o.config.AuthenticationConfig)

})
}
}