Skip to content

Commit

Permalink
Merge pull request #412 from devsbb/kafka-sasl-plain-support
Browse files Browse the repository at this point in the history
Add support for kafka auth using SASL with PLAIN mechanism
  • Loading branch information
zhouzhuojie authored Oct 26, 2020
2 parents 3b50881 + 88b13a2 commit 98831a8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/config/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ var Config = struct {
RecorderKafkaKeyFile string `env:"FLAGR_RECORDER_KAFKA_KEYFILE" envDefault:""`
RecorderKafkaCAFile string `env:"FLAGR_RECORDER_KAFKA_CAFILE" envDefault:""`
RecorderKafkaVerifySSL bool `env:"FLAGR_RECORDER_KAFKA_VERIFYSSL" envDefault:"false"`
RecorderKafkaSimpleSSL bool `env:"FLAGR_RECORDER_KAFKA_SIMPLE_SSL" envDefault:"false"`
RecorderKafkaSASLUsername string `env:"FLAGR_RECORDER_KAFKA_SASL_USERNAME" envDefault:""`
RecorderKafkaSASLPassword string `env:"FLAGR_RECORDER_KAFKA_SASL_PASSWORD" envDefault:""`
RecorderKafkaVerbose bool `env:"FLAGR_RECORDER_KAFKA_VERBOSE" envDefault:"true"`
RecorderKafkaTopic string `env:"FLAGR_RECORDER_KAFKA_TOPIC" envDefault:"flagr-records"`
RecorderKafkaRetryMax int `env:"FLAGR_RECORDER_KAFKA_RETRYMAX" envDefault:"5"`
Expand Down
16 changes: 15 additions & 1 deletion pkg/handler/data_recorder_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,19 @@ var NewKafkaRecorder = func() DataRecorder {
config.Config.RecorderKafkaKeyFile,
config.Config.RecorderKafkaCAFile,
config.Config.RecorderKafkaVerifySSL,
config.Config.RecorderKafkaSimpleSSL,
)
if tlscfg != nil {
cfg.Net.TLS.Enable = true
cfg.Net.TLS.Config = tlscfg
}

if config.Config.RecorderKafkaSASLUsername != "" && config.Config.RecorderKafkaSASLPassword != "" {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.User = config.Config.RecorderKafkaSASLUsername
cfg.Net.SASL.Password = config.Config.RecorderKafkaSASLPassword
}

cfg.Producer.RequiredAcks = sarama.WaitForLocal
cfg.Producer.Retry.Max = config.Config.RecorderKafkaRetryMax
cfg.Producer.Flush.Frequency = config.Config.RecorderKafkaFlushFrequency
Expand Down Expand Up @@ -77,7 +85,7 @@ var NewKafkaRecorder = func() DataRecorder {
}
}

func createTLSConfiguration(certFile string, keyFile string, caFile string, verifySSL bool) (t *tls.Config) {
func createTLSConfiguration(certFile string, keyFile string, caFile string, verifySSL bool, simpleSSL bool) (t *tls.Config) {
if certFile != "" && keyFile != "" && caFile != "" {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
Expand All @@ -98,6 +106,12 @@ func createTLSConfiguration(certFile string, keyFile string, caFile string, veri
InsecureSkipVerify: !verifySSL,
}
}

if simpleSSL {
t = &tls.Config{
InsecureSkipVerify: !verifySSL,
}
}
// will be nil by default if nothing is provided
return t
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/handler/data_recorder_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestCreateTLSConfiguration(t *testing.T) {
"./testdata/certificates/alice.key",
"./testdata/certificates/ca.crt",
true,
false,
)
assert.NotZero(t, tlsConfig)

Expand All @@ -50,8 +51,18 @@ func TestCreateTLSConfiguration(t *testing.T) {
"",
"",
true,
false,
)
assert.Zero(t, tlsConfig)

tlsConfig = createTLSConfiguration(
"",
"",
"",
true,
true,
)
assert.NotZero(t, tlsConfig)
})

t.Run("cert or key file not found", func(t *testing.T) {
Expand All @@ -61,6 +72,7 @@ func TestCreateTLSConfiguration(t *testing.T) {
"./testdata/certificates/not_found.key",
"./testdata/certificates/ca.crt",
true,
false,
)
})
})
Expand All @@ -72,6 +84,7 @@ func TestCreateTLSConfiguration(t *testing.T) {
"./testdata/certificates/alice.key",
"./testdata/certificates/not_found.crt",
true,
false,
)
})
})
Expand Down
3 changes: 2 additions & 1 deletion pkg/handler/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"math/rand"
"os"
"path"

"github.com/checkr/flagr/pkg/entity"
"github.com/checkr/flagr/swagger_gen/restapi/operations/export"
Expand All @@ -25,7 +26,7 @@ var exportSQLiteHandler = func(p export.GetExportSqliteParams) middleware.Respon
}

var exportSQLiteFile = func(excludeSnapshots *bool) (file io.ReadCloser, done func(), err error) {
fname := fmt.Sprintf("/tmp/flagr_%d.sqlite", rand.Int31())
fname := path.Join(os.TempDir(), fmt.Sprintf("flagr_%d.sqlite", rand.Int31()))
done = func() {
os.Remove(fname)
logrus.WithField("file", fname).Debugf("removing the tmp file")
Expand Down

0 comments on commit 98831a8

Please sign in to comment.