Skip to content

Commit

Permalink
drainer: add KafkaClientID in DBConfig (#902) (#929)
Browse files Browse the repository at this point in the history
Improve configuration for kafka syncer:
Add kafka-client-id in drainer.toml to config kafka client.id property.

Co-authored-by: dixingxing <[email protected]>
  • Loading branch information
WangXiangUSTC and dixingxing0 authored Mar 6, 2020
1 parent 56ced1e commit e4f2ba2
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/drainer/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ port = 3306
# kafka-addrs = "127.0.0.1:9092"
# kafka-version = "0.8.2.0"
# kafka-max-messages = 1024
# kafka-client-id = "tidb_binlog"
#
#
# the topic name drainer will push msg, the default name is <cluster-id>_obinlog
Expand Down
4 changes: 4 additions & 0 deletions drainer/sync/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ func NewKafka(cfg *DBConfig, tableInfoGetter translator.TableInfoGetter) (*Kafka
return nil, errors.Trace(err)
}

if len(cfg.KafkaClientID) > 0 {
config.ClientID = cfg.KafkaClientID
}

config.Producer.Flush.MaxMessages = cfg.KafkaMaxMessages

// maintain minimal set that has been necessary so far
Expand Down
1 change: 1 addition & 0 deletions drainer/sync/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type DBConfig struct {
KafkaAddrs string `toml:"kafka-addrs" json:"kafka-addrs"`
KafkaVersion string `toml:"kafka-version" json:"kafka-version"`
KafkaMaxMessages int `toml:"kafka-max-messages" json:"kafka-max-messages"`
KafkaClientID string `toml:"kafka-client-id" json:"kafka-client-id"`
TopicName string `toml:"topic-name" json:"topic-name"`
// get it from pd
ClusterID uint64 `toml:"-" json:"-"`
Expand Down
1 change: 1 addition & 0 deletions tests/binlog/drainer.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ port = 3306
# kafka-addrs = "127.0.0.1:9092"
# kafka-version = "0.8.2.0"
# kafka-max-messages = 1024
# kafka-client-id = "tidb_binlog"
#
#
# the topic name drainer will push msg, the default name is <cluster-id>_obinlog
Expand Down

0 comments on commit e4f2ba2

Please sign in to comment.