Skip to content

Commit

Permalink
cdc/kafka: fix kafka metrics collector (#4969) (#4989)
Browse files Browse the repository at this point in the history
close #4966
  • Loading branch information
ti-chi-bot authored Mar 28, 2022
1 parent 303f402 commit 3b46dc1
Show file tree
Hide file tree
Showing 4 changed files with 772 additions and 215 deletions.
19 changes: 5 additions & 14 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ import (
const (
// defaultPartitionNum specifies the default number of partitions when we create the topic.
defaultPartitionNum = 3

// flushMetricsInterval specifies the interval of refresh sarama metrics.
flushMetricsInterval = 5 * time.Second
)

const (
Expand Down Expand Up @@ -78,8 +75,6 @@ type kafkaSaramaProducer struct {

role util.Role
id model.ChangeFeedID

metricsMonitor *saramaMetricsMonitor
}

type kafkaProducerClosingFlag = int32
Expand Down Expand Up @@ -213,6 +208,9 @@ func (k *kafkaSaramaProducer) Close() error {
if k.producersReleased {
// We need to guard against double closing the clients,
// which could lead to panic.
log.Warn("kafka producer already released",
zap.String("changefeed", k.id),
zap.Any("role", k.role))
return nil
}
k.producersReleased = true
Expand Down Expand Up @@ -254,8 +252,6 @@ func (k *kafkaSaramaProducer) Close() error {
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

k.metricsMonitor.Cleanup()

// adminClient should be closed last, since `metricsMonitor` would use it when `Cleanup`.
start = time.Now()
if err := k.admin.Close(); err != nil {
Expand All @@ -277,17 +273,13 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
k.stop()
}()

ticker := time.NewTicker(flushMetricsInterval)
defer ticker.Stop()
for {
var ack *sarama.ProducerMessage
select {
case <-ctx.Done():
return ctx.Err()
case <-k.closeCh:
return nil
case <-ticker.C:
k.metricsMonitor.CollectMetrics()
case err := <-k.failpointCh:
log.Warn("receive from failpoint chan", zap.Error(err),
zap.String("changefeed", k.id), zap.Any("role", k.role))
Expand Down Expand Up @@ -342,6 +334,8 @@ func NewKafkaSaramaProducer(
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

runSaramaMetricsMonitor(ctx, saramaConfig.MetricRegistry, changefeedID, role, admin)

k := &kafkaSaramaProducer{
admin: admin,
client: client,
Expand All @@ -355,9 +349,6 @@ func NewKafkaSaramaProducer(

id: changefeedID,
role: role,

metricsMonitor: newSaramaMetricsMonitor(
saramaConfig.MetricRegistry, changefeedID, admin),
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
Expand Down
82 changes: 57 additions & 25 deletions cdc/sink/producer/kafka/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package kafka

import (
"context"
"strconv"
"time"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/pkg/kafka"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/rcrowley/go-metrics"
"go.uber.org/zap"
Expand Down Expand Up @@ -174,17 +177,18 @@ const (

type saramaMetricsMonitor struct {
changefeedID string
role util.Role

registry metrics.Registry
admin kafka.ClusterAdminClient

brokers map[int32]struct{}
}

// CollectMetrics collect all monitored metrics
func (sm *saramaMetricsMonitor) CollectMetrics() {
// collectMetrics collect all monitored metrics
func (sm *saramaMetricsMonitor) collectMetrics() {
sm.collectProducerMetrics()
if err := sm.collectBrokerMetrics(); err != nil {
log.Warn("collect broker metrics failed", zap.Error(err))
}
sm.collectBrokerMetrics()
}

func (sm *saramaMetricsMonitor) collectProducerMetrics() {
Expand Down Expand Up @@ -213,14 +217,28 @@ func getBrokerMetricName(prefix, brokerID string) string {
return prefix + brokerID
}

func (sm *saramaMetricsMonitor) collectBrokerMetrics() error {
func (sm *saramaMetricsMonitor) collectBrokers() {
start := time.Now()
brokers, _, err := sm.admin.DescribeCluster()
if err != nil {
return err
log.Warn("kafka cluster unreachable, "+
"use historical brokers to collect kafka broker level metrics",
zap.String("changefeed", sm.changefeedID),
zap.Any("role", sm.role),
zap.Duration("duration", time.Since(start)))
return
}

for _, b := range brokers {
brokerID := strconv.Itoa(int(b.ID()))
sm.brokers[b.ID()] = struct{}{}
}
}

func (sm *saramaMetricsMonitor) collectBrokerMetrics() {
sm.collectBrokers()

for id := range sm.brokers {
brokerID := strconv.Itoa(int(id))

incomingByteRateMetric := sm.registry.Get(getBrokerMetricName(incomingByteRateMetricNamePrefix, brokerID))
if meter, ok := incomingByteRateMetric.(metrics.Meter); ok {
Expand Down Expand Up @@ -262,22 +280,43 @@ func (sm *saramaMetricsMonitor) collectBrokerMetrics() error {
responseSizeGauge.WithLabelValues(sm.changefeedID, brokerID).Set(histogram.Snapshot().Mean())
}
}
return nil
}

func newSaramaMetricsMonitor(registry metrics.Registry, changefeedID string, admin kafka.ClusterAdminClient) *saramaMetricsMonitor {
return &saramaMetricsMonitor{
// flushMetricsInterval specifies the interval of refresh sarama metrics.
const flushMetricsInterval = 5 * time.Second

func runSaramaMetricsMonitor(ctx context.Context, registry metrics.Registry, changefeedID string,
role util.Role, admin kafka.ClusterAdminClient,
) {
monitor := &saramaMetricsMonitor{
changefeedID: changefeedID,
role: role,
registry: registry,
admin: admin,
brokers: make(map[int32]struct{}),
}

ticker := time.NewTicker(flushMetricsInterval)
go func() {
defer func() {
ticker.Stop()
monitor.cleanup()
}()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
monitor.collectMetrics()
}
}
}()
}

func (sm *saramaMetricsMonitor) Cleanup() {
// cleanup called when the changefeed / processor stop the kafka sink.
func (sm *saramaMetricsMonitor) cleanup() {
sm.cleanUpProducerMetrics()
if err := sm.cleanUpBrokerMetrics(); err != nil {
log.Warn("clean up broker metrics failed", zap.Error(err))
}
sm.cleanUpBrokerMetrics()
}

func (sm *saramaMetricsMonitor) cleanUpProducerMetrics() {
Expand All @@ -287,15 +326,9 @@ func (sm *saramaMetricsMonitor) cleanUpProducerMetrics() {
compressionRatioGauge.DeleteLabelValues(sm.changefeedID)
}

func (sm *saramaMetricsMonitor) cleanUpBrokerMetrics() error {
brokers, _, err := sm.admin.DescribeCluster()
if err != nil {
return err
}

for _, b := range brokers {
brokerID := strconv.Itoa(int(b.ID()))

func (sm *saramaMetricsMonitor) cleanUpBrokerMetrics() {
for id := range sm.brokers {
brokerID := strconv.Itoa(int(id))
incomingByteRateGauge.DeleteLabelValues(sm.changefeedID, brokerID)
outgoingByteRateGauge.DeleteLabelValues(sm.changefeedID, brokerID)
requestRateGauge.DeleteLabelValues(sm.changefeedID, brokerID)
Expand All @@ -305,5 +338,4 @@ func (sm *saramaMetricsMonitor) cleanUpBrokerMetrics() error {
responseRateGauge.DeleteLabelValues(sm.changefeedID, brokerID)
responseSizeGauge.DeleteLabelValues(sm.changefeedID, brokerID)
}
return nil
}
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ invalid admin job type: %d

["CDC:ErrInvalidChangefeedID"]
error = '''
bad changefeed id, please match the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$, the length should no more than %d", eg, "simple-changefeed-task"
bad changefeed id, please match the pattern "^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$", the length should no more than %d, eg, "simple-changefeed-task",
'''

["CDC:ErrInvalidDDLJob"]
Expand Down Expand Up @@ -1103,7 +1103,7 @@ file lock conflict: %s

["ErrRequestForwardErr"]
error = '''
request forward error, an request can only forward to owner one time
request forward error, an request can only forward to owner one time
'''

["ErrSortDirLockError"]
Expand Down
Loading

0 comments on commit 3b46dc1

Please sign in to comment.