diff --git a/pkg/app/app.go b/pkg/app/app.go index 0fcf074..13e3917 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -50,6 +50,7 @@ func parseConfig() (Config, error) { config := Config{ Kafka: kafka.Config{ BootstrapServers: []string{"0.0.0.0:9092"}, + RootCAPath: "", Topic: "email-updated", Group: "email-verifier", ClientID: "bufstream-demo", @@ -104,6 +105,13 @@ func parseConfig() (Config, error) { config.CSR.Password, "The Confluent Schema Registry password/token, if authentication is needed.", ) + flagSet.StringVarP( + &config.Kafka.RootCAPath, + "tls-root-ca-path", + "", + config.Kafka.RootCAPath, + "A path to root CA certificate for kafka TLS.", + ) if err := flagSet.Parse(os.Args[1:]); err != nil { return Config{}, err } diff --git a/pkg/kafka/kafka.go b/pkg/kafka/kafka.go index 07bd15b..5c38380 100644 --- a/pkg/kafka/kafka.go +++ b/pkg/kafka/kafka.go @@ -1,6 +1,11 @@ package kafka import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "os" "time" "github.com/twmb/franz-go/pkg/kgo" @@ -16,6 +21,7 @@ type Config struct { // // TODO: A good explanation and links to docs as to what this does. BootstrapServers []string + RootCAPath string Group string Topic string ClientID string @@ -23,12 +29,43 @@ type Config struct { // NewKafkaClient returns a new franz-go Kafka Client for the given Config. func NewKafkaClient(config Config) (*kgo.Client, error) { - return kgo.NewClient( + opts := []kgo.Opt{ kgo.SeedBrokers(config.BootstrapServers...), kgo.ConsumerGroup(config.Group), kgo.ConsumeTopics(config.Topic), kgo.ClientID(config.ClientID), kgo.AllowAutoTopicCreation(), kgo.FetchMaxWait(time.Second), - ) + } + + if config.RootCAPath != "" { + dialerTLSConfig, err := buildDialerTLSConfig(config.RootCAPath) + if err != nil { + return nil, fmt.Errorf("build dial tls config: %w", err) + } + + opts = append(opts, kgo.DialTLSConfig(dialerTLSConfig)) + } + + return kgo.NewClient(opts...) +} + +func buildDialerTLSConfig(rootCAPath string) (*tls.Config, error) { + pool := x509.NewCertPool() + + caCert, err := os.ReadFile(rootCAPath) + if err != nil { + return nil, err + } + + if !pool.AppendCertsFromPEM(caCert) { + return nil, errors.New("parse CA cert failed") + } + + tlsCfg := &tls.Config{ + MinVersion: tls.VersionTLS12, + RootCAs: pool, + } + + return tlsCfg, nil }