Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc/sink: adjust kafka initialization logic #3192

Merged
merged 80 commits into from
Nov 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
f71d6aa
add admin file
3AceShowHand Oct 29, 2021
01f2a59
rebase master.
3AceShowHand Nov 5, 2021
78b34dd
fix create topic.
3AceShowHand Oct 29, 2021
7487412
use config.partitionNum directly.
3AceShowHand Oct 29, 2021
8accab1
close admin.
3AceShowHand Oct 29, 2021
e17169d
fix testcase.
3AceShowHand Oct 29, 2021
2592d9a
tiny fix.
3AceShowHand Nov 5, 2021
eec9a3d
fix some typo.
3AceShowHand Nov 5, 2021
def3227
refactor parameter check in topicPreProcess.
3AceShowHand Nov 6, 2021
bc76e4a
fix kafka test.
3AceShowHand Nov 6, 2021
4a8e731
add a test for config initialization.
3AceShowHand Nov 6, 2021
62b8eff
remove unnecessary code in kafka test.
3AceShowHand Nov 6, 2021
4a780b3
add ut for initialization.
3AceShowHand Nov 6, 2021
57747c1
use PartitionNum.
3AceShowHand Nov 6, 2021
c9263ea
remove unnecessary kafka package at the moment.
3AceShowHand Nov 6, 2021
75e9091
add a log.
3AceShowHand Nov 6, 2021
2f48945
tiny fix.
3AceShowHand Nov 6, 2021
068371a
tiny fix.
3AceShowHand Nov 6, 2021
4924f90
use failpoint to workaround for NewClusterAdmin in test.
3AceShowHand Nov 6, 2021
69d3c1b
simple typo fix.
3AceShowHand Nov 6, 2021
a0f919a
add more test for preprocess.
3AceShowHand Nov 6, 2021
43a523e
disable the checkpoint.
3AceShowHand Nov 6, 2021
4f1f032
fix enable failpoint syntax.
3AceShowHand Nov 6, 2021
da10398
fix parameters.
3AceShowHand Nov 6, 2021
2f2e780
Adjust the layout of comment for create sarama config.
3AceShowHand Nov 6, 2021
4d73caa
adjust layout for sarama config producer.
3AceShowHand Nov 7, 2021
f371a32
fix test.
3AceShowHand Nov 8, 2021
7396f02
fix kafka version in mq_test.
3AceShowHand Nov 8, 2021
268519a
remove failpoint.
3AceShowHand Nov 8, 2021
41a93ea
remove control by variable.
3AceShowHand Nov 8, 2021
004e817
add failpoint.
3AceShowHand Nov 8, 2021
add6c07
add failpoint.
3AceShowHand Nov 8, 2021
58d40e9
fix failpoint interm.
3AceShowHand Nov 9, 2021
c873264
revert version.
3AceShowHand Nov 9, 2021
c65b5f0
try to fix max-message-size larger than topic.
3AceShowHand Nov 9, 2021
8f8d9aa
add a debug log.
3AceShowHand Nov 9, 2021
8dee36c
remove the log.
3AceShowHand Nov 9, 2021
dec15c0
create topic with max-message-size.
3AceShowHand Nov 9, 2021
8217ae3
fix comment.
3AceShowHand Nov 9, 2021
98977fd
Update cdc/sink/producer/kafka/kafka.go
3AceShowHand Nov 9, 2021
be59f9b
fix by code review suggestion.
3AceShowHand Nov 9, 2021
b49cf35
Update cdc/sink/producer/kafka/kafka.go
3AceShowHand Nov 9, 2021
7276058
Update cdc/sink/producer/kafka/kafka.go
3AceShowHand Nov 9, 2021
388e42c
Update cdc/sink/producer/kafka/kafka.go
3AceShowHand Nov 9, 2021
e495cb8
fix by code review suggestion.
3AceShowHand Nov 9, 2021
46d1243
fix log layout.
3AceShowHand Nov 9, 2021
72d05d5
make topic out of the Config.
3AceShowHand Nov 9, 2021
91ea692
fix test.
3AceShowHand Nov 9, 2021
73f7175
fix test.
3AceShowHand Nov 9, 2021
0eb4547
fix by code review suggestion.
3AceShowHand Nov 9, 2021
ba707c4
Rewrite the logic in topicPreProcess.
3AceShowHand Nov 9, 2021
02f5300
Adjust the layout of comment for create sarama config.
3AceShowHand Nov 9, 2021
e1e9d27
fix logic if topic already exist.
3AceShowHand Nov 9, 2021
a0b8611
Adjust the layout of comment for create sarama config.
3AceShowHand Nov 9, 2021
ab525f7
tiny fix.
3AceShowHand Nov 9, 2021
0957074
remove useless test.
3AceShowHand Nov 10, 2021
8bb0775
try to get broker configuration.
3AceShowHand Nov 10, 2021
26cf591
add print config entries.
3AceShowHand Nov 10, 2021
3466091
fix convert to string.
3AceShowHand Nov 10, 2021
a158a64
tiny fix.
3AceShowHand Nov 10, 2021
0386d93
move get broker config up.
3AceShowHand Nov 10, 2021
614cc5e
refine the logic when create the topic.
3AceShowHand Nov 10, 2021
e536a7c
remove initialize topic with max-message-bytes.
3AceShowHand Nov 10, 2021
7e8aa4b
adjust comment.
3AceShowHand Nov 10, 2021
3ceb5bf
fix the comment.
3AceShowHand Nov 10, 2021
6b6c4c4
fix the comment.
3AceShowHand Nov 10, 2021
cf7e0ca
only fetch message.max.bytes.
3AceShowHand Nov 10, 2021
b8852ae
refine the comment.
3AceShowHand Nov 10, 2021
dcb8562
if topic cannot find the max.message.bytes, find message.max.bytes fr…
3AceShowHand Nov 10, 2021
5d95d75
extract adjustPartitionNum.
3AceShowHand Nov 10, 2021
f24bc4e
remove TopicPreProcessCreate test.
3AceShowHand Nov 10, 2021
16e5c90
revert comment out test.
3AceShowHand Nov 10, 2021
cadfc72
check max-message-bytes only for default and craft protocol.
3AceShowHand Nov 11, 2021
5603c5a
fix make check.
3AceShowHand Nov 11, 2021
9e94a61
fix error handling.
3AceShowHand Nov 12, 2021
9d5b6a2
Update cdc/sink/mq.go
3AceShowHand Nov 12, 2021
bad0771
Merge branch 'master' into extract-kafka-admin
ti-chi-bot Nov 12, 2021
8cc9b91
Merge branch 'master' into extract-kafka-admin
ti-chi-bot Nov 12, 2021
26b770c
Merge branch 'master' into extract-kafka-admin
ti-chi-bot Nov 12, 2021
1e2b3c7
Merge branch 'master' into extract-kafka-admin
ti-chi-bot Nov 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,13 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi
topic := strings.TrimFunc(sinkURI.Path, func(r rune) bool {
return r == '/'
})
producer, err := kafka.NewKafkaSaramaProducer(ctx, sinkURI.Host, topic, config, errCh)
if topic == "" {
return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri")
}

var protocol codec.Protocol
protocol.FromString(replicaConfig.Sink.Protocol)
producer, err := kafka.NewKafkaSaramaProducer(ctx, topic, protocol, config, errCh)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -423,8 +429,8 @@ func newPulsarSink(ctx context.Context, sinkURI *url.URL, filter *filter.Filter,
if s != "" {
opts["max-batch-size"] = s
}
// For now, it's a place holder. Avro format have to make connection to Schema Registery,
// and it may needs credential.
// For now, it's a placeholder. Avro format have to make connection to Schema Registry,
// and it may need credential.
credential := &security.Credential{}
sink, err := newMqSink(ctx, credential, producer, filter, replicaConfig, opts, errCh)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down Expand Up @@ -161,6 +167,12 @@ func (s mqSinkSuite) TestKafkaSinkFilter(c *check.C) {
c.Assert(err, check.IsNil)
opts := map[string]string{}
errCh := make(chan error, 1)

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate", "return(true)"), check.IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/sink/producer/kafka/SkipTopicAutoCreate")
}()

sink, err := newKafkaSaramaSink(ctx, sinkURI, fr, replicaConfig, opts, errCh)
c.Assert(err, check.IsNil)

Expand Down
240 changes: 179 additions & 61 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import (
"go.uber.org/zap"
)

const defaultPartitionNum = 4
const defaultPartitionNum = 3
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved

// Config stores the Kafka configuration
// Config stores user specified Kafka producer configuration
type Config struct {
PartitionNum int32
BrokerEndpoints []string
PartitionNum int32

// User should make sure that `replication-factor` not greater than the number of kafka brokers.
ReplicationFactor int16

Version string
Expand All @@ -50,8 +53,8 @@ type Config struct {
ClientID string
Credential *security.Credential
SaslScram *security.SaslScram
// control whether to create topic and verify partition number
TopicPreProcess bool
// control whether to create topic
AutoCreate bool
}

// NewConfig returns a default Kafka configuration
Expand All @@ -64,12 +67,13 @@ func NewConfig() *Config {
Compression: "none",
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
TopicPreProcess: true,
AutoCreate: true,
}
}

// Initialize the kafka configuration
func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfig, opts map[string]string) error {
c.BrokerEndpoints = strings.Split(sinkURI.Host, ",")
params := sinkURI.Query()
s := params.Get("partition-num")
if s != "" {
Expand Down Expand Up @@ -155,7 +159,7 @@ func (c *Config) Initialize(sinkURI *url.URL, replicaConfig *config.ReplicaConfi
if err != nil {
return err
}
c.TopicPreProcess = autoCreate
c.AutoCreate = autoCreate
}

s = params.Get("protocol")
Expand Down Expand Up @@ -395,80 +399,122 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}
}

// kafkaTopicPreProcess gets partition number from existing topic, if topic doesn't
// exit, creates it automatically.
func kafkaTopicPreProcess(topic, address string, config *Config, cfg *sarama.Config) (int32, error) {
admin, err := sarama.NewClusterAdmin(strings.Split(address, ","), cfg)
func topicPreProcess(topic string, protocol codec.Protocol, config *Config, saramaConfig *sarama.Config) error {
// FIXME: find a way to remove this failpoint for workload the unit test
failpoint.Inject("SkipTopicAutoCreate", func() {
failpoint.Return(nil)
})
admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
defer func() {
err := admin.Close()
if err != nil {
log.Warn("close admin client failed", zap.Error(err))
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
}
}()

topics, err := admin.ListTopics()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
partitionNum := config.PartitionNum
topicDetail, exist := topics[topic]
if exist {
log.Info("get partition number of topic", zap.String("topic", topic), zap.Int32("partition_num", topicDetail.NumPartitions))
if partitionNum == 0 {
partitionNum = topicDetail.NumPartitions
} else if partitionNum < topicDetail.NumPartitions {
log.Warn("partition number assigned in sink-uri is less than that of topic", zap.Int32("topic partition num", topicDetail.NumPartitions))
} else if partitionNum > topicDetail.NumPartitions {
return 0, cerror.ErrKafkaInvalidPartitionNum.GenWithStack(
"partition number(%d) assigned in sink-uri is more than that of topic(%d)", partitionNum, topicDetail.NumPartitions)
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

info, created := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if created {
// make sure that topic's `max.message.bytes` is not less than given `max-message-bytes`
// else the producer will send message that too large to make topic reject, then changefeed would error.
// only the default `open protocol` and `craft protocol` use `max-message-bytes`, so check this for them.
if protocol == codec.ProtocolDefault || protocol == codec.ProtocolCraft {
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
if topicMaxMessageBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"topic already exist, and topic's max.message.bytes(%d) less than max-message-bytes(%d)."+
"Please make sure `max-message-bytes` not greater than topic `max.message.bytes`",
topicMaxMessageBytes, config.MaxMessageBytes)
}
}
} else {
if partitionNum == 0 {
partitionNum = defaultPartitionNum
log.Warn("topic not found and partition number is not specified, using default partition number", zap.String("topic", topic), zap.Int32("partition_num", partitionNum))

// no need to create the topic, but we would have to log user if they found enter wrong topic name later
if config.AutoCreate {
log.Warn("topic already exist, TiCDC will not create the topic",
zap.String("topic", topic), zap.Any("detail", info))
}
log.Info("create a topic", zap.String("topic", topic),
zap.Int32("partition_num", partitionNum),
zap.Int16("replication_factor", config.ReplicationFactor))
err := admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: partitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
// TODO idenfity the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)

if err := config.adjustPartitionNum(info.NumPartitions); err != nil {
return errors.Trace(err)
}

return nil
}

if !config.AutoCreate {
return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found")
}

// when try to create the topic, we don't know how to set the `max.message.bytes` for the topic.
// Kafka would create the topic with broker's `message.max.bytes`,
// we have to make sure it's not greater than `max-message-bytes`
brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}

if brokerMessageMaxBytes < config.MaxMessageBytes {
return cerror.ErrKafkaInvalidConfig.GenWithStack(
"broker's message.max.bytes(%d) less than max-message-bytes(%d)"+
"Please make sure `max-message-bytes` not greater than broker's `message.max.bytes`",
brokerMessageMaxBytes, config.MaxMessageBytes)
}

// topic not created yet, and user does not specify the `partition-num` in the sink uri.
if config.PartitionNum == 0 {
config.PartitionNum = defaultPartitionNum
log.Warn("partition-num is not set, use the default partition count",
zap.String("topic", topic), zap.Int32("partitions", config.PartitionNum))
}

return partitionNum, nil
err = admin.CreateTopic(topic, &sarama.TopicDetail{
NumPartitions: config.PartitionNum,
ReplicationFactor: config.ReplicationFactor,
}, false)
3AceShowHand marked this conversation as resolved.
Show resolved Hide resolved
// TODO identify the cause of "Topic with this name already exists"
if err != nil && !strings.Contains(err.Error(), "already exists") {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

log.Info("TiCDC create the topic",
zap.Int32("partition-num", config.PartitionNum),
zap.Int16("replication-factor", config.ReplicationFactor))

return nil
}

var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
func NewKafkaSaramaProducer(ctx context.Context, topic string, protocol codec.Protocol, config *Config, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}
asyncClient, err := sarama.NewAsyncProducer(strings.Split(address, ","), cfg)
if err != nil {

if err := topicPreProcess(topic, protocol, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
syncClient, err := sarama.NewSyncProducer(strings.Split(address, ","), cfg)

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

partitionNum := config.PartitionNum
if config.TopicPreProcess {
partitionNum, err = kafkaTopicPreProcess(topic, address, config, cfg)
if err != nil {
return nil, err
}
syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

notifier := new(notify.Notifier)
Expand All @@ -480,11 +526,11 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c
asyncClient: asyncClient,
syncClient: syncClient,
topic: topic,
partitionNum: partitionNum,
partitionNum: config.PartitionNum,
partitionOffset: make([]struct {
flushed uint64
sent uint64
}, partitionNum),
}, config.PartitionNum),
flushedNotifier: notifier,
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
Expand Down Expand Up @@ -550,7 +596,10 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
}
config.Version = version
// See: https://kafka.apache.org/documentation/#replication
// When one of the brokers in a Kafka cluster is down, the partition leaders in this broker is broken, Kafka will election a new partition leader and replication logs, this process will last from a few seconds to a few minutes. Kafka cluster will not provide a writing service in this process.
// When one of the brokers in a Kafka cluster is down, the partition leaders
// in this broker is broken, Kafka will election a new partition leader and
// replication logs, this process will last from a few seconds to a few minutes.
// Kafka cluster will not provide a writing service in this process.
// Time out in one minute(120 * 500ms).
config.Metadata.Retry.Max = 120
config.Metadata.Retry.Backoff = 500 * time.Millisecond
Expand All @@ -561,6 +610,10 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Return.Errors = true
config.Producer.RequiredAcks = sarama.WaitForAll

// Time out in five minutes(600 * 500ms).
config.Producer.Retry.Max = 600
config.Producer.Retry.Backoff = 500 * time.Millisecond

switch strings.ToLower(strings.TrimSpace(c.Compression)) {
case "none":
config.Producer.Compression = sarama.CompressionNone
Expand All @@ -577,10 +630,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Producer.Compression = sarama.CompressionNone
}

// Time out in five minutes(600 * 500ms).
config.Producer.Retry.Max = 600
config.Producer.Retry.Backoff = 500 * time.Millisecond

// Time out in one minute(120 * 500ms).
config.Admin.Retry.Max = 120
config.Admin.Retry.Backoff = 500 * time.Millisecond
Expand Down Expand Up @@ -610,3 +659,72 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {

return config, err
}

func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) {
target := "message.max.bytes"
_, controllerID, err := admin.DescribeCluster()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

configEntries, err := admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(int(controllerID)),
ConfigNames: []string{target},
})
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if len(configEntries) == 0 || configEntries[0].Name != target {
return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack(
"since cannot find the `message.max.bytes` from the broker's configuration, " +
"ticdc decline to create the topic and changefeed to prevent potential error")
}

result, err := strconv.Atoi(configEntries[0].Value)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

return result, nil
}

func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) {
if a, ok := info.ConfigEntries["max.message.bytes"]; ok {
result, err := strconv.Atoi(*a)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
return result, nil
}

return getBrokerMessageMaxBytes(admin)
}

// adjust the partition-num by the topic's partition count
func (c *Config) adjustPartitionNum(realPartitionCount int32) error {
// user does not specify the `partition-num` in the sink-uri
if c.PartitionNum == 0 {
c.PartitionNum = realPartitionCount
return nil
}

if c.PartitionNum < realPartitionCount {
log.Warn("number of partition specified in sink-uri is less than that of the actual topic. "+
"Some partitions will not have messages dispatched to",
zap.Int32("sink-uri partitions", c.PartitionNum),
zap.Int32("topic partitions", realPartitionCount))
return nil
}

// Make sure that the user-specified `partition-num` is not greater than
// the real partition count, since messages would be dispatched to different
// partitions, this could prevent potential correctness problems.
if c.PartitionNum > realPartitionCount {
return cerror.ErrKafkaInvalidPartitionNum.GenWithStack(
"the number of partition (%d) specified in sink-uri is more than that of actual topic (%d)",
c.PartitionNum, realPartitionCount)
}
return nil
}
Loading