diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index d669d2a7d58..457eb3215b0 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -281,6 +281,18 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( EnablePartitionSeparator: c.Sink.EnablePartitionSeparator, FileIndexWidth: c.Sink.FileIndexWidth, } + + if c.Sink.KafkaConfig != nil { + res.Sink.KafkaConfig = &config.KafkaConfig{ + SASLMechanism: c.Sink.KafkaConfig.SASLMechanism, + SASLOAuthClientID: c.Sink.KafkaConfig.SASLOAuthClientID, + SASLOAuthClientSecret: c.Sink.KafkaConfig.SASLOAuthClientSecret, + SASLOAuthTokenURL: c.Sink.KafkaConfig.SASLOAuthTokenURL, + SASLOAuthScopes: c.Sink.KafkaConfig.SASLOAuthScopes, + SASLOAuthGrantType: c.Sink.KafkaConfig.SASLOAuthGrantType, + SASLOAuthAudience: c.Sink.KafkaConfig.SASLOAuthAudience, + } + } } if c.Mounter != nil { res.Mounter = &config.MounterConfig{ @@ -386,6 +398,18 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator, FileIndexWidth: cloned.Sink.FileIndexWidth, } + + if cloned.Sink.KafkaConfig != nil { + res.Sink.KafkaConfig = &KafkaConfig{ + SASLMechanism: cloned.Sink.KafkaConfig.SASLMechanism, + SASLOAuthClientID: cloned.Sink.KafkaConfig.SASLOAuthClientID, + SASLOAuthClientSecret: cloned.Sink.KafkaConfig.SASLOAuthClientSecret, + SASLOAuthTokenURL: cloned.Sink.KafkaConfig.SASLOAuthTokenURL, + SASLOAuthScopes: cloned.Sink.KafkaConfig.SASLOAuthScopes, + SASLOAuthGrantType: cloned.Sink.KafkaConfig.SASLOAuthGrantType, + SASLOAuthAudience: cloned.Sink.KafkaConfig.SASLOAuthAudience, + } + } } if cloned.Consistent != nil { res.Consistent = &ConsistentConfig{ @@ -515,6 +539,19 @@ type SinkConfig struct { DateSeparator string `json:"date_separator"` EnablePartitionSeparator bool `json:"enable_partition_separator"` FileIndexWidth int `json:"file_index_width"` + KafkaConfig *KafkaConfig `json:"kafka_config"` +} + +// KafkaConfig represents kafka config for a changefeed. +// This is a duplicate of config.KafkaConfig +type KafkaConfig struct { + SASLMechanism *string `json:"sasl_mechanism,omitempty"` + SASLOAuthClientID *string `json:"sasl_oauth_client_id,omitempty"` + SASLOAuthClientSecret *string `json:"sasl_oauth_client_secret,omitempty"` + SASLOAuthTokenURL *string `json:"sasl_oauth_token_url,omitempty"` + SASLOAuthScopes []string `json:"sasl_oauth_scopes,omitempty"` + SASLOAuthGrantType *string `json:"sasl_oauth_grant_type,omitempty"` + SASLOAuthAudience *string `json:"sasl_oauth_audience,omitempty"` } // CSVConfig denotes the csv config diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index ca5cd482da6..1ab888a90c0 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1858,6 +1858,35 @@ var doc = `{ } } }, + "v2.KafkaConfig": { + "type": "object", + "properties": { + "sasl_mechanism": { + "type": "string" + }, + "sasl_oauth_audience": { + "type": "string" + }, + "sasl_oauth_client_id": { + "type": "string" + }, + "sasl_oauth_client_secret": { + "type": "string" + }, + "sasl_oauth_grant_type": { + "type": "string" + }, + "sasl_oauth_scopes": { + "type": "array", + "items": { + "type": "string" + } + }, + "sasl_oauth_token_url": { + "type": "string" + } + } + }, "v2.LogLevelReq": { "type": "object", "properties": { @@ -2020,6 +2049,9 @@ var doc = `{ "file_index_width": { "type": "integer" }, + "kafka_config": { + "$ref": "#/definitions/v2.KafkaConfig" + }, "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index c0c12db0faa..d414ee3a010 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1839,6 +1839,35 @@ } } }, + "v2.KafkaConfig": { + "type": "object", + "properties": { + "sasl_mechanism": { + "type": "string" + }, + "sasl_oauth_audience": { + "type": "string" + }, + "sasl_oauth_client_id": { + "type": "string" + }, + "sasl_oauth_client_secret": { + "type": "string" + }, + "sasl_oauth_grant_type": { + "type": "string" + }, + "sasl_oauth_scopes": { + "type": "array", + "items": { + "type": "string" + } + }, + "sasl_oauth_token_url": { + "type": "string" + } + } + }, "v2.LogLevelReq": { "type": "object", "properties": { @@ -2001,6 +2030,9 @@ "file_index_width": { "type": "integer" }, + "kafka_config": { + "$ref": "#/definitions/v2.KafkaConfig" + }, "protocol": { "type": "string" }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index ba542b73a74..561dfdded1a 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -453,6 +453,25 @@ definitions: type: string type: array type: object + v2.KafkaConfig: + properties: + sasl_mechanism: + type: string + sasl_oauth_audience: + type: string + sasl_oauth_client_id: + type: string + sasl_oauth_client_secret: + type: string + sasl_oauth_grant_type: + type: string + sasl_oauth_scopes: + items: + type: string + type: array + sasl_oauth_token_url: + type: string + type: object v2.LogLevelReq: properties: log_level: @@ -559,6 +578,8 @@ definitions: type: integer file_index_width: type: integer + kafka_config: + $ref: '#/definitions/v2.KafkaConfig' protocol: type: string schema_registry: