Skip to content

Commit

Permalink
api(ticdc): parse the kafka config (#9432)
Browse files Browse the repository at this point in the history
close #8865
  • Loading branch information
Rustin170506 authored Jul 26, 2023
1 parent aa97333 commit c21fbee
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 0 deletions.
37 changes: 37 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,18 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
EnablePartitionSeparator: c.Sink.EnablePartitionSeparator,
FileIndexWidth: c.Sink.FileIndexWidth,
}

if c.Sink.KafkaConfig != nil {
res.Sink.KafkaConfig = &config.KafkaConfig{
SASLMechanism: c.Sink.KafkaConfig.SASLMechanism,
SASLOAuthClientID: c.Sink.KafkaConfig.SASLOAuthClientID,
SASLOAuthClientSecret: c.Sink.KafkaConfig.SASLOAuthClientSecret,
SASLOAuthTokenURL: c.Sink.KafkaConfig.SASLOAuthTokenURL,
SASLOAuthScopes: c.Sink.KafkaConfig.SASLOAuthScopes,
SASLOAuthGrantType: c.Sink.KafkaConfig.SASLOAuthGrantType,
SASLOAuthAudience: c.Sink.KafkaConfig.SASLOAuthAudience,
}
}
}
if c.Mounter != nil {
res.Mounter = &config.MounterConfig{
Expand Down Expand Up @@ -386,6 +398,18 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator,
FileIndexWidth: cloned.Sink.FileIndexWidth,
}

if cloned.Sink.KafkaConfig != nil {
res.Sink.KafkaConfig = &KafkaConfig{
SASLMechanism: cloned.Sink.KafkaConfig.SASLMechanism,
SASLOAuthClientID: cloned.Sink.KafkaConfig.SASLOAuthClientID,
SASLOAuthClientSecret: cloned.Sink.KafkaConfig.SASLOAuthClientSecret,
SASLOAuthTokenURL: cloned.Sink.KafkaConfig.SASLOAuthTokenURL,
SASLOAuthScopes: cloned.Sink.KafkaConfig.SASLOAuthScopes,
SASLOAuthGrantType: cloned.Sink.KafkaConfig.SASLOAuthGrantType,
SASLOAuthAudience: cloned.Sink.KafkaConfig.SASLOAuthAudience,
}
}
}
if cloned.Consistent != nil {
res.Consistent = &ConsistentConfig{
Expand Down Expand Up @@ -515,6 +539,19 @@ type SinkConfig struct {
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
FileIndexWidth int `json:"file_index_width"`
KafkaConfig *KafkaConfig `json:"kafka_config"`
}

// KafkaConfig represents kafka config for a changefeed.
// This is a duplicate of config.KafkaConfig
type KafkaConfig struct {
SASLMechanism *string `json:"sasl_mechanism,omitempty"`
SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"`
SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"`
SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"`
SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"`
SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"`
SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"`
}

// CSVConfig denotes the csv config
Expand Down
32 changes: 32 additions & 0 deletions docs/swagger/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1858,6 +1858,35 @@ var doc = `{
}
}
},
"v2.KafkaConfig": {
"type": "object",
"properties": {
"sasl_mechanism": {
"type": "string"
},
"sasl_oauth_audience": {
"type": "string"
},
"sasl_oauth_client_id": {
"type": "string"
},
"sasl_oauth_client_secret": {
"type": "string"
},
"sasl_oauth_grant_type": {
"type": "string"
},
"sasl_oauth_scopes": {
"type": "array",
"items": {
"type": "string"
}
},
"sasl_oauth_token_url": {
"type": "string"
}
}
},
"v2.LogLevelReq": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2020,6 +2049,9 @@ var doc = `{
"file_index_width": {
"type": "integer"
},
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"protocol": {
"type": "string"
},
Expand Down
32 changes: 32 additions & 0 deletions docs/swagger/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -1839,6 +1839,35 @@
}
}
},
"v2.KafkaConfig": {
"type": "object",
"properties": {
"sasl_mechanism": {
"type": "string"
},
"sasl_oauth_audience": {
"type": "string"
},
"sasl_oauth_client_id": {
"type": "string"
},
"sasl_oauth_client_secret": {
"type": "string"
},
"sasl_oauth_grant_type": {
"type": "string"
},
"sasl_oauth_scopes": {
"type": "array",
"items": {
"type": "string"
}
},
"sasl_oauth_token_url": {
"type": "string"
}
}
},
"v2.LogLevelReq": {
"type": "object",
"properties": {
Expand Down Expand Up @@ -2001,6 +2030,9 @@
"file_index_width": {
"type": "integer"
},
"kafka_config": {
"$ref": "#/definitions/v2.KafkaConfig"
},
"protocol": {
"type": "string"
},
Expand Down
21 changes: 21 additions & 0 deletions docs/swagger/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,25 @@ definitions:
type: string
type: array
type: object
v2.KafkaConfig:
properties:
sasl_mechanism:
type: string
sasl_oauth_audience:
type: string
sasl_oauth_client_id:
type: string
sasl_oauth_client_secret:
type: string
sasl_oauth_grant_type:
type: string
sasl_oauth_scopes:
items:
type: string
type: array
sasl_oauth_token_url:
type: string
type: object
v2.LogLevelReq:
properties:
log_level:
Expand Down Expand Up @@ -559,6 +578,8 @@ definitions:
type: integer
file_index_width:
type: integer
kafka_config:
$ref: '#/definitions/v2.KafkaConfig'
protocol:
type: string
schema_registry:
Expand Down

0 comments on commit c21fbee

Please sign in to comment.