Skip to content

Commit

Permalink
pkg/sink(ticdc): add oauth support for sarama Kafka sink (#8938) (#9095)
Browse files Browse the repository at this point in the history
close #8865
  • Loading branch information
ti-chi-bot authored May 30, 2023
1 parent 0a60f07 commit 8ca433f
Show file tree
Hide file tree
Showing 13 changed files with 550 additions and 40 deletions.
2 changes: 1 addition & 1 deletion cdc/sink/mq/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func NewKafkaSaramaSink(ctx context.Context, sinkURI *url.URL,
}

baseConfig := kafka.NewConfig()
if err := baseConfig.Apply(sinkURI); err != nil {
if err := baseConfig.Apply(sinkURI, replicaConfig); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}

Expand Down
90 changes: 84 additions & 6 deletions cdc/sink/mq/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafka
import (
"context"
"crypto/tls"
"encoding/base64"
"net/url"
"strconv"
"strings"
Expand Down Expand Up @@ -103,8 +104,8 @@ func (c *Config) setPartitionNum(realPartitionCount int32) error {
return nil
}

// Apply the sinkURI to update Config
func (c *Config) Apply(sinkURI *url.URL) error {
// Apply the configuration to the sarama producer.
func (c *Config) Apply(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) error {
c.BrokerEndpoints = strings.Split(sinkURI.Host, ",")
params := sinkURI.Query()
s := params.Get("partition-num")
Expand Down Expand Up @@ -185,7 +186,7 @@ func (c *Config) Apply(sinkURI *url.URL) error {
c.ReadTimeout = a
}

err := c.applySASL(params)
err := c.applySASL(params, replicaConfig)
if err != nil {
return err
}
Expand Down Expand Up @@ -256,7 +257,7 @@ func (c *Config) applyTLS(params url.Values) error {
return nil
}

func (c *Config) applySASL(params url.Values) error {
func (c *Config) applySASL(params url.Values, replicaConfig *config.ReplicaConfig) error {
s := params.Get("sasl-user")
if s != "" {
c.SASL.SASLUser = s
Expand All @@ -274,6 +275,12 @@ func (c *Config) applySASL(params url.Values) error {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
c.SASL.SASLMechanism = mechanism
} else if replicaConfig != nil && replicaConfig.Sink != nil && replicaConfig.Sink.KafkaConfig != nil && replicaConfig.Sink.KafkaConfig.SASLMechanism != nil {
mechanism, err := security.SASLMechanismFromString(*replicaConfig.Sink.KafkaConfig.SASLMechanism)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaInvalidConfig, err)
}
c.SASL.SASLMechanism = mechanism
}

s = params.Get("sasl-gssapi-auth-type")
Expand Down Expand Up @@ -324,6 +331,67 @@ func (c *Config) applySASL(params url.Values) error {
c.SASL.GSSAPI.DisablePAFXFAST = disablePAFXFAST
}

if replicaConfig.Sink != nil && replicaConfig.Sink.KafkaConfig != nil {
if replicaConfig.Sink.KafkaConfig.SASLOAuthClientID != nil {
clientID := *replicaConfig.Sink.KafkaConfig.SASLOAuthClientID
if clientID == "" {
return cerror.ErrKafkaInvalidConfig.GenWithStack("OAuth2 client ID cannot be empty")
}
c.SASL.OAuth2.ClientID = clientID
}

if replicaConfig.Sink.KafkaConfig.SASLOAuthClientSecret != nil {
clientSecret := *replicaConfig.Sink.KafkaConfig.SASLOAuthClientSecret
if clientSecret == "" {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"OAuth2 client secret cannot be empty")
}

// BASE64 decode the client secret
decodedClientSecret, err := base64.StdEncoding.DecodeString(clientSecret)
if err != nil {
log.Error("OAuth2 client secret is not base64 encoded", zap.Error(err))
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"OAuth2 client secret is not base64 encoded")
}
c.SASL.OAuth2.ClientSecret = string(decodedClientSecret)
}

if replicaConfig.Sink.KafkaConfig.SASLOAuthTokenURL != nil {
tokenURL := *replicaConfig.Sink.KafkaConfig.SASLOAuthTokenURL
if tokenURL == "" {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"OAuth2 token URL cannot be empty")
}
c.SASL.OAuth2.TokenURL = tokenURL
}

if c.SASL.OAuth2.IsEnable() {
if c.SASL.SASLMechanism != security.OAuthMechanism {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"OAuth2 is only supported with SASL mechanism type OAUTHBEARER, but got %s",
c.SASL.SASLMechanism)
}

if err := c.SASL.OAuth2.Validate(); err != nil {
return cerror.ErrKafkaInvalidConfig.Wrap(err)
}
c.SASL.OAuth2.SetDefault()
}

if replicaConfig.Sink.KafkaConfig.SASLOAuthScopes != nil {
c.SASL.OAuth2.Scopes = replicaConfig.Sink.KafkaConfig.SASLOAuthScopes
}

if replicaConfig.Sink.KafkaConfig.SASLOAuthGrantType != nil {
c.SASL.OAuth2.GrantType = *replicaConfig.Sink.KafkaConfig.SASLOAuthGrantType
}

if replicaConfig.Sink.KafkaConfig.SASLOAuthAudience != nil {
c.SASL.OAuth2.Audience = *replicaConfig.Sink.KafkaConfig.SASLOAuthAudience
}
}

return nil
}

Expand Down Expand Up @@ -445,12 +513,14 @@ func NewSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Net.TLS.Config.InsecureSkipVerify = c.InsecureSkipVerify
}

completeSaramaSASLConfig(config, c)
if err := completeSaramaSASLConfig(ctx, config, c); err != nil {
return nil, errors.Trace(err)
}

return config, err
}

func completeSaramaSASLConfig(config *sarama.Config, c *Config) {
func completeSaramaSASLConfig(ctx context.Context, config *sarama.Config, c *Config) error {
if c.SASL != nil && c.SASL.SASLMechanism != "" {
config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLMechanism(c.SASL.SASLMechanism)
Expand Down Expand Up @@ -480,6 +550,14 @@ func completeSaramaSASLConfig(config *sarama.Config, c *Config) {
case security.KeyTabAuth:
config.Net.SASL.GSSAPI.KeyTabPath = c.SASL.GSSAPI.KeyTabPath
}
case sarama.SASLTypeOAuth:
p, err := newTokenProvider(ctx, c)
if err != nil {
return errors.Trace(err)
}
config.Net.SASL.TokenProvider = p
}

}
return nil
}
Loading

0 comments on commit 8ca433f

Please sign in to comment.