Skip to content

Commit

Permalink
kafka(ticdc): sarama admin client fetch metadata by cache (#9511)
Browse files Browse the repository at this point in the history
close #9504
  • Loading branch information
3AceShowHand authored Aug 11, 2023
1 parent 0eef200 commit 447e512
Show file tree
Hide file tree
Showing 10 changed files with 137 additions and 270 deletions.
45 changes: 20 additions & 25 deletions cdc/sink/dmlsink/mq/manager/kafka_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewKafkaTopicManager(
changefeedID model.ChangeFeedID,
admin kafka.ClusterAdminClient,
cfg *kafka.AutoCreateTopicConfig,
) (*kafkaTopicManager, error) {
) *kafkaTopicManager {
mgr := &kafkaTopicManager{
changefeedID: changefeedID,
admin: admin,
Expand All @@ -70,7 +70,7 @@ func NewKafkaTopicManager(
// Background refresh metadata.
go mgr.backgroundRefreshMeta(ctx)

return mgr, nil
return mgr
}

// GetPartitionNum returns the number of partitions of the topic.
Expand Down Expand Up @@ -102,20 +102,12 @@ func (m *kafkaTopicManager) backgroundRefreshMeta(ctx context.Context) {
)
return
case <-m.metaRefreshTicker.C:
topicMetaList, err := m.getMetadataOfTopics(ctx)
// We ignore the error here, because the error may be caused by the
// network problem, and we can try to get the metadata next time.
if err != nil {
log.Warn("Get metadata of topics failed",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
topicPartitionNums, _ := m.fetchAllTopicsPartitionsNum(ctx)
for topic, partitionNum := range topicPartitionNums {
m.tryUpdatePartitionsAndLogging(topic, partitionNum)
}

for topic, detail := range topicMetaList {
m.tryUpdatePartitionsAndLogging(topic, detail.NumPartitions)
}

}
}
}
Expand Down Expand Up @@ -147,21 +139,21 @@ func (m *kafkaTopicManager) tryUpdatePartitionsAndLogging(topic string, partitio
}
}

func (m *kafkaTopicManager) getMetadataOfTopics(
// fetchAllTopicsPartitionsNum fetches all topics' partitions number.
// The error returned by this method could be a transient error that is fixable by the underlying logic.
// When handling this error, please be cautious.
// If you simply throw the error to the caller, it may impact the robustness of your program.
func (m *kafkaTopicManager) fetchAllTopicsPartitionsNum(
ctx context.Context,
) (map[string]kafka.TopicDetail, error) {
var topicList []string

) (map[string]int32, error) {
var topics []string
m.topics.Range(func(key, value any) bool {
topic := key.(string)
topicList = append(topicList, topic)

topics = append(topics, key.(string))
return true
})

start := time.Now()
// ignore the topic with error, return a subset of all topics.
topicMetaList, err := m.admin.GetTopicsMeta(ctx, topicList, true)
numPartitions, err := m.admin.GetTopicsPartitionsNum(ctx, topics)
if err != nil {
log.Warn(
"Kafka admin client describe topics failed",
Expand All @@ -179,7 +171,7 @@ func (m *kafkaTopicManager) getMetadataOfTopics(
zap.String("changefeed", m.changefeedID.ID),
zap.Duration("duration", time.Since(start)))

return topicMetaList, nil
return numPartitions, nil
}

// waitUntilTopicVisible is called after CreateTopic to make sure the topic
Expand All @@ -194,6 +186,8 @@ func (m *kafkaTopicManager) waitUntilTopicVisible(
topics := []string{topicName}
err := retry.Do(ctx, func() error {
start := time.Now()
// ignoreTopicError is set to false since we just create the topic,
// make sure the topic is visible.
meta, err := m.admin.GetTopicsMeta(ctx, topics, false)
if err != nil {
log.Warn(" topic not found, retry it",
Expand Down Expand Up @@ -267,10 +261,11 @@ func (m *kafkaTopicManager) createTopic(

// CreateTopicAndWaitUntilVisible wraps createTopic and waitUntilTopicVisible together.
func (m *kafkaTopicManager) CreateTopicAndWaitUntilVisible(
ctx context.Context,
topicName string,
ctx context.Context, topicName string,
) (int32, error) {
// If the topic is not in the cache, we try to get the metadata of the topic.
// ignoreTopicErr is set to true to ignore the error if the topic is not found,
// which means we should create the topic later.
topicDetails, err := m.admin.GetTopicsMeta(ctx, []string{topicName}, true)
if err != nil {
return 0, errors.Trace(err)
Expand Down
24 changes: 5 additions & 19 deletions cdc/sink/dmlsink/mq/manager/kafka_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ func TestPartitions(t *testing.T) {
}

ctx := context.Background()
manager, err := NewKafkaTopicManager(ctx,
model.DefaultChangeFeedID("test"), adminClient, cfg)
require.Nil(t, err)
manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg)
defer manager.Close()

partitionsNum, err := manager.GetPartitionNum(
Expand All @@ -58,10 +56,7 @@ func TestCreateTopic(t *testing.T) {
}

ctx := context.Background()
manager, err := NewKafkaTopicManager(ctx,
model.DefaultChangeFeedID("test"),
adminClient, cfg)
require.Nil(t, err)
manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.CreateTopicAndWaitUntilVisible(ctx, kafka.DefaultMockTopicName)
require.Nil(t, err)
Expand All @@ -76,10 +71,7 @@ func TestCreateTopic(t *testing.T) {

// Try to create a topic without auto create.
cfg.AutoCreate = false
manager, err = NewKafkaTopicManager(ctx,
model.DefaultChangeFeedID("test"),
adminClient, cfg)
require.Nil(t, err)
manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic2")
require.Regexp(
Expand All @@ -95,10 +87,7 @@ func TestCreateTopic(t *testing.T) {
PartitionNum: 2,
ReplicationFactor: 4,
}
manager, err = NewKafkaTopicManager(ctx,
model.DefaultChangeFeedID("test"),
adminClient, cfg)
require.Nil(t, err)
manager = NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg)
defer manager.Close()
_, err = manager.CreateTopicAndWaitUntilVisible(ctx, "new-topic-failed")
require.Regexp(
Expand All @@ -120,10 +109,7 @@ func TestCreateTopicWithDelay(t *testing.T) {
}

ctx := context.Background()
manager, err := NewKafkaTopicManager(ctx,
model.DefaultChangeFeedID("test"),
adminClient, cfg)
require.Nil(t, err)
manager := NewKafkaTopicManager(ctx, model.DefaultChangeFeedID("test"), adminClient, cfg)
defer manager.Close()
partitionNum, err := manager.createTopic(ctx, "new_topic")
require.Nil(t, err)
Expand Down
10 changes: 2 additions & 8 deletions cdc/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,9 @@ func GetTopicManagerAndTryCreateTopic(
topicCfg *kafka.AutoCreateTopicConfig,
adminClient kafka.ClusterAdminClient,
) (manager.TopicManager, error) {
topicManager, err := manager.NewKafkaTopicManager(
ctx,
changefeedID,
adminClient,
topicCfg,
topicManager := manager.NewKafkaTopicManager(
ctx, changefeedID, adminClient, topicCfg,
)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewProducer, err)
}

if _, err := topicManager.CreateTopicAndWaitUntilVisible(ctx, topic); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaCreateTopic, err)
Expand Down
Loading

0 comments on commit 447e512

Please sign in to comment.