Skip to content

Commit

Permalink
pkg/sink(ticdc): add GetTopicConfig support (#9107)
Browse files Browse the repository at this point in the history
close #8959
  • Loading branch information
Rustin170506 authored Jun 1, 2023
1 parent bcf5d1b commit 493c0f6
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 28 deletions.
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -406,9 +406,9 @@ error = '''
kafka async send message failed
'''

["CDC:ErrKafkaBrokerConfigNotFound"]
["CDC:ErrKafkaConfigNotFound"]
error = '''
kafka broker config item not found
kafka config item not found
'''

["CDC:ErrKafkaCreateTopic"]
Expand Down
6 changes: 3 additions & 3 deletions pkg/errors/cdc_errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,9 @@ var (
"invalid topic expression",
errors.RFCCodeText("CDC:ErrKafkaTopicExprInvalid"),
)
ErrKafkaBrokerConfigNotFound = errors.Normalize(
"kafka broker config item not found",
errors.RFCCodeText("CDC:ErrKafkaBrokerConfigNotFound"),
ErrKafkaConfigNotFound = errors.Normalize(
"kafka config item not found",
errors.RFCCodeText("CDC:ErrKafkaConfigNotFound"),
)
ErrRedoConfigInvalid = errors.Normalize(
"redo log config invalid",
Expand Down
40 changes: 39 additions & 1 deletion pkg/sink/kafka/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,48 @@ func (a *saramaAdminClient) GetBrokerConfig(
zap.String("namespace", a.changefeed.Namespace),
zap.String("changefeed", a.changefeed.ID),
zap.String("configName", configName))
return "", cerror.ErrKafkaBrokerConfigNotFound.GenWithStack(
return "", cerror.ErrKafkaConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
}

func (a *saramaAdminClient) GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error) {
var configEntries []sarama.ConfigEntry
var err error
query := func() error {
configEntries, err = a.admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.TopicResource,
Name: topicName,
ConfigNames: []string{configName},
})
return err
}
err = a.queryClusterWithRetry(ctx, query)
if err != nil {
return "", err
}

// For compatibility with KOP, we checked all return values.
// 1. Kafka only returns requested configs.
// 2. Kop returns all configs.
for _, entry := range configEntries {
if entry.Name == configName {
log.Info("Kafka config item found",
zap.String("namespace", a.changefeed.Namespace),
zap.String("changefeed", a.changefeed.ID),
zap.String("configName", configName),
zap.String("configValue", entry.Value))
return entry.Value, nil
}
}

log.Warn("Kafka config item not found",
zap.String("namespace", a.changefeed.Namespace),
zap.String("changefeed", a.changefeed.ID),
zap.String("configName", configName))
return "", cerror.ErrKafkaConfigNotFound.GenWithStack(
"cannot find the `%s` from the topic's configuration", configName)
}

func (a *saramaAdminClient) GetTopicsMeta(
ctx context.Context,
topics []string,
Expand Down
4 changes: 3 additions & 1 deletion pkg/sink/kafka/cluster_admin_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type TopicDetail struct {
Name string
NumPartitions int32
ReplicationFactor int16
ConfigEntries map[string]string
}

// Broker represents a Kafka broker.
Expand All @@ -42,6 +41,9 @@ type ClusterAdminClient interface {
// GetBrokerConfig return the broker level configuration with the `configName`
GetBrokerConfig(ctx context.Context, configName string) (string, error)

// GetTopicConfig return the topic level configuration with the `configName`
GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error)

// GetTopicsMeta return all target topics' metadata
// if `ignoreTopicError` is true, ignore the topic error and return the metadata of valid topics
GetTopicsMeta(ctx context.Context,
Expand Down
29 changes: 23 additions & 6 deletions pkg/sink/kafka/cluster_admin_client_mock_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,30 +64,34 @@ type ClusterAdminClientMockImpl struct {
// Cluster controller ID.
controllerID int
brokerConfigs map[string]string
topicConfigs map[string]map[string]string
}

// NewClusterAdminClientMockImpl news a ClusterAdminClientMockImpl struct with default configurations.
func NewClusterAdminClientMockImpl() *ClusterAdminClientMockImpl {
topics := make(map[string]*topicDetail)
configEntries := make(map[string]string)
configEntries[TopicMaxMessageBytesConfigName] = TopicMaxMessageBytes
configEntries[MinInsyncReplicasConfigName] = MinInSyncReplicas
topics[DefaultMockTopicName] = &topicDetail{
fetchesRemainingUntilVisible: 0,
TopicDetail: TopicDetail{
Name: DefaultMockTopicName,
NumPartitions: 3,
ConfigEntries: configEntries,
},
}

brokerConfigs := make(map[string]string)
brokerConfigs[BrokerMessageMaxBytesConfigName] = BrokerMessageMaxBytes
brokerConfigs[MinInsyncReplicasConfigName] = MinInSyncReplicas

topicConfigs := make(map[string]map[string]string)
topicConfigs[DefaultMockTopicName] = make(map[string]string)
topicConfigs[DefaultMockTopicName][TopicMaxMessageBytesConfigName] = TopicMaxMessageBytes
topicConfigs[DefaultMockTopicName][MinInsyncReplicasConfigName] = MinInSyncReplicas

return &ClusterAdminClientMockImpl{
topics: topics,
controllerID: defaultMockControllerID,
brokerConfigs: brokerConfigs,
topicConfigs: topicConfigs,
}
}

Expand All @@ -108,12 +112,25 @@ func (c *ClusterAdminClientMockImpl) GetBrokerConfig(
) (string, error) {
value, ok := c.brokerConfigs[configName]
if !ok {
return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack(
return "", errors.ErrKafkaConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
}
return value, nil
}

// GetTopicConfig implement the ClusterAdminClient interface
func (c *ClusterAdminClientMockImpl) GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error) {
if _, ok := c.topics[topicName]; !ok {
return "", errors.ErrKafkaConfigNotFound.GenWithStack("cannot find the `%s` from the topic's configuration", topicName)
}
value, ok := c.topicConfigs[topicName][configName]
if !ok {
return "", errors.ErrKafkaConfigNotFound.GenWithStack(
"cannot find the `%s` from the topic's configuration", configName)
}
return value, nil
}

// SetRemainingFetchesUntilTopicVisible is used to control the visibility of a specific topic.
// It is used to mock the topic creation delay.
func (c *ClusterAdminClientMockImpl) SetRemainingFetchesUntilTopicVisible(
Expand Down Expand Up @@ -183,7 +200,7 @@ func (c *ClusterAdminClientMockImpl) Close() {}

// SetMinInsyncReplicas sets the MinInsyncReplicas for broker and default topic.
func (c *ClusterAdminClientMockImpl) SetMinInsyncReplicas(minInsyncReplicas string) {
c.topics[DefaultMockTopicName].ConfigEntries[MinInsyncReplicasConfigName] = minInsyncReplicas
c.topicConfigs[DefaultMockTopicName][MinInsyncReplicasConfigName] = minInsyncReplicas
c.brokerConfigs[MinInsyncReplicasConfigName] = minInsyncReplicas
}

Expand Down
14 changes: 8 additions & 6 deletions pkg/sink/kafka/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ func AdjustOptions(
if exists {
// make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes`
topicMaxMessageBytesStr, err := getTopicConfig(
ctx, admin, info,
ctx, admin, info.Name,
TopicMaxMessageBytesConfigName,
BrokerMessageMaxBytesConfigName,
)
Expand Down Expand Up @@ -664,7 +664,7 @@ func validateMinInsyncReplicas(
info, exists := topics[topic]
if exists {
minInsyncReplicasStr, err := getTopicConfig(
ctx, admin, info,
ctx, admin, info.Name,
MinInsyncReplicasConfigName,
MinInsyncReplicasConfigName)
if err != nil {
Expand All @@ -685,7 +685,7 @@ func validateMinInsyncReplicas(
minInsyncReplicasStr, exists, err := minInsyncReplicasConfigGetter()
if err != nil {
// 'min.insync.replica' is invisible to us in Confluent Cloud Kafka.
if cerror.ErrKafkaBrokerConfigNotFound.Equal(err) {
if cerror.ErrKafkaConfigNotFound.Equal(err) {
log.Warn("TiCDC cannot find `min.insync.replicas` from broker's configuration, " +
"please make sure that the replication factor is greater than or equal " +
"to the minimum number of in-sync replicas" +
Expand Down Expand Up @@ -728,13 +728,15 @@ func validateMinInsyncReplicas(
func getTopicConfig(
ctx context.Context,
admin ClusterAdminClient,
detail TopicDetail,
topicName string,
topicConfigName string,
brokerConfigName string,
) (string, error) {
if a, ok := detail.ConfigEntries[topicConfigName]; ok {
return a, nil
if c, err := admin.GetTopicConfig(ctx, topicName, topicConfigName); err == nil {
return c, nil
}

log.Info("TiCDC cannot find the configuration from topic, try to get it from broker",
zap.String("topic", topicName), zap.String("config", topicConfigName))
return admin.GetBrokerConfig(ctx, brokerConfigName)
}
2 changes: 0 additions & 2 deletions pkg/sink/kafka/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,6 @@ func TestAdjustConfigTopicExist(t *testing.T) {
detail := &TopicDetail{
Name: topicName,
NumPartitions: 3,
// Does not contain `max.message.bytes`.
ConfigEntries: make(map[string]string),
}
err = adminClient.CreateTopic(context.Background(), detail, false)
require.Nil(t, err)
Expand Down
47 changes: 45 additions & 2 deletions pkg/sink/kafka/v2/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (a *admin) GetBrokerConfig(ctx context.Context, configName string) (string,
if len(resp.Resources) == 0 || len(resp.Resources[0].ConfigEntries) == 0 {
log.Warn("Kafka config item not found",
zap.String("configName", configName))
return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack(
return "", errors.ErrKafkaConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
}

Expand All @@ -113,10 +113,53 @@ func (a *admin) GetBrokerConfig(ctx context.Context, configName string) (string,

log.Warn("Kafka config item not found",
zap.String("configName", configName))
return "", errors.ErrKafkaBrokerConfigNotFound.GenWithStack(
return "", errors.ErrKafkaConfigNotFound.GenWithStack(
"cannot find the `%s` from the broker's configuration", configName)
}

func (a *admin) GetTopicConfig(ctx context.Context, topicName string, configName string) (string, error) {
request := &kafka.DescribeConfigsRequest{
Resources: []kafka.DescribeConfigRequestResource{
{
ResourceType: kafka.ResourceTypeTopic,
ResourceName: topicName,
ConfigNames: []string{configName},
},
},
}

resp, err := a.client.DescribeConfigs(ctx, request)
if err != nil {
return "", errors.Trace(err)
}

if len(resp.Resources) == 0 || len(resp.Resources[0].ConfigEntries) == 0 {
log.Warn("Kafka config item not found",
zap.String("configName", configName))
return "", errors.ErrKafkaConfigNotFound.GenWithStack(
"cannot find the `%s` from the topic's configuration", configName)
}

// For compatibility with KOP, we checked all return values.
// 1. Kafka only returns requested configs.
// 2. Kop returns all configs.
for _, entry := range resp.Resources[0].ConfigEntries {
if entry.ConfigName == configName {
log.Info("Kafka config item found",
zap.String("namespace", a.changefeedID.Namespace),
zap.String("changefeed", a.changefeedID.ID),
zap.String("configName", configName),
zap.String("configValue", entry.ConfigValue))
return entry.ConfigValue, nil
}
}

log.Warn("Kafka config item not found",
zap.String("configName", configName))
return "", errors.ErrKafkaConfigNotFound.GenWithStack(
"cannot find the `%s` from the topic's configuration", configName)
}

func (a *admin) GetTopicsMeta(
ctx context.Context,
topics []string,
Expand Down
7 changes: 2 additions & 5 deletions pkg/sink/kafka/v2/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestGetBrokerConfig(t *testing.T) {
Resources: []kafka.DescribeConfigResponseResource{},
}, nil)
result, err = admin.GetBrokerConfig(ctx, "test-config-name")
require.Error(t, err, errors.ErrKafkaBrokerConfigNotFound)
require.Error(t, err, errors.ErrKafkaConfigNotFound)
require.Equal(t, "", result)

// config is found
Expand Down Expand Up @@ -176,7 +176,7 @@ func TestGetBrokerConfig(t *testing.T) {
},
}, nil)
result, err = admin.GetBrokerConfig(ctx, "test-config-name")
require.Error(t, err, errors.ErrKafkaBrokerConfigNotFound)
require.Error(t, err, errors.ErrKafkaConfigNotFound)
require.Len(t, result, 0)
}

Expand Down Expand Up @@ -293,7 +293,6 @@ func TestCreateTopic(t *testing.T) {
Name: "topic-1",
NumPartitions: 1,
ReplicationFactor: 1,
ConfigEntries: nil,
}, false)
require.Error(t, err)

Expand All @@ -308,7 +307,6 @@ func TestCreateTopic(t *testing.T) {
Name: "topic-1",
NumPartitions: 1,
ReplicationFactor: 1,
ConfigEntries: nil,
}, false)
require.Error(t, err, "topic-1 error")

Expand All @@ -318,7 +316,6 @@ func TestCreateTopic(t *testing.T) {
Name: "topic-1",
NumPartitions: 1,
ReplicationFactor: 1,
ConfigEntries: nil,
}, false)
require.NoError(t, err)
}
Expand Down

0 comments on commit 493c0f6

Please sign in to comment.