Skip to content

Commit

Permalink
Add TLS dialer support for kafka client (#12)
Browse files Browse the repository at this point in the history
so it can talk to TLS enabled bufstream
  • Loading branch information
alapienebuf authored Oct 5, 2024
1 parent cb2ccb7 commit 90309fd
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
8 changes: 8 additions & 0 deletions pkg/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func parseConfig() (Config, error) {
config := Config{
Kafka: kafka.Config{
BootstrapServers: []string{"0.0.0.0:9092"},
RootCAPath: "",
Topic: "email-updated",
Group: "email-verifier",
ClientID: "bufstream-demo",
Expand Down Expand Up @@ -104,6 +105,13 @@ func parseConfig() (Config, error) {
config.CSR.Password,
"The Confluent Schema Registry password/token, if authentication is needed.",
)
flagSet.StringVarP(
&config.Kafka.RootCAPath,
"tls-root-ca-path",
"",
config.Kafka.RootCAPath,
"A path to root CA certificate for kafka TLS.",
)
if err := flagSet.Parse(os.Args[1:]); err != nil {
return Config{}, err
}
Expand Down
41 changes: 39 additions & 2 deletions pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package kafka

import (
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"os"
"time"

"github.com/twmb/franz-go/pkg/kgo"
Expand All @@ -16,19 +21,51 @@ type Config struct {
//
// TODO: A good explanation and links to docs as to what this does.
BootstrapServers []string
RootCAPath string
Group string
Topic string
ClientID string
}

// NewKafkaClient returns a new franz-go Kafka Client for the given Config.
func NewKafkaClient(config Config) (*kgo.Client, error) {
return kgo.NewClient(
opts := []kgo.Opt{
kgo.SeedBrokers(config.BootstrapServers...),
kgo.ConsumerGroup(config.Group),
kgo.ConsumeTopics(config.Topic),
kgo.ClientID(config.ClientID),
kgo.AllowAutoTopicCreation(),
kgo.FetchMaxWait(time.Second),
)
}

if config.RootCAPath != "" {
dialerTLSConfig, err := buildDialerTLSConfig(config.RootCAPath)
if err != nil {
return nil, fmt.Errorf("build dial tls config: %w", err)
}

opts = append(opts, kgo.DialTLSConfig(dialerTLSConfig))
}

return kgo.NewClient(opts...)
}

func buildDialerTLSConfig(rootCAPath string) (*tls.Config, error) {
pool := x509.NewCertPool()

caCert, err := os.ReadFile(rootCAPath)
if err != nil {
return nil, err
}

if !pool.AppendCertsFromPEM(caCert) {
return nil, errors.New("parse CA cert failed")
}

tlsCfg := &tls.Config{
MinVersion: tls.VersionTLS12,
RootCAs: pool,
}

return tlsCfg, nil
}

0 comments on commit 90309fd

Please sign in to comment.