Skip to content

Commit

Permalink
cdc/metrics: Integrate sarama producer metrics (pingcap#4520) (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 25, 2022
1 parent 3441303 commit 2817747
Show file tree
Hide file tree
Showing 6 changed files with 1,702 additions and 189 deletions.
2 changes: 2 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/cdc/puller/sorter"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/producer/kafka"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -42,4 +43,5 @@ func init() {
tablepipeline.InitMetrics(registry)
owner.InitMetrics(registry)
initServerMetrics(registry)
kafka.InitMetrics(registry)
}
9 changes: 6 additions & 3 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -90,7 +91,9 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
return errors.Trace(err)
}

s, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh)
stdCtx := util.PutChangefeedIDInCtx(ctx, id)
stdCtx = util.PutRoleInCtx(stdCtx, util.RoleOwner)
s, err := sink.NewSink(stdCtx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -99,13 +102,13 @@ func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeF
if !info.SyncPointEnabled {
return nil
}
syncPointStore, err := sink.NewSyncpointStore(ctx, id, info.SinkURI)
syncPointStore, err := sink.NewSyncpointStore(stdCtx, id, info.SinkURI)
if err != nil {
return errors.Trace(err)
}
a.syncPointStore = syncPointStore

if err := a.syncPointStore.CreateSynctable(ctx); err != nil {
if err := a.syncPointStore.CreateSynctable(stdCtx); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
38 changes: 32 additions & 6 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,20 @@ import (
"go.uber.org/zap"
)

const defaultPartitionNum = 3
const (
// defaultPartitionNum specifies the default number of partitions when we create the topic.
defaultPartitionNum = 3

// flushMetricsInterval specifies the interval of refresh sarama metrics.
flushMetricsInterval = 5 * time.Second
)

type kafkaSaramaProducer struct {
// clientLock is used to protect concurrent access of asyncProducer and syncProducer.
// Since we don't close these two clients (which have an input chan) from the
// sender routine, data race or send on closed chan could happen.
clientLock sync.RWMutex
admin kafka.ClusterAdminClient
client sarama.Client
asyncProducer sarama.AsyncProducer
syncProducer sarama.SyncProducer
Expand All @@ -67,6 +74,8 @@ type kafkaSaramaProducer struct {

role util.Role
id model.ChangeFeedID

metricsMonitor *saramaMetricsMonitor
}

type kafkaProducerClosingFlag = int32
Expand Down Expand Up @@ -253,6 +262,19 @@ func (k *kafkaSaramaProducer) Close() error {
log.Info("sync client closed", zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

k.metricsMonitor.Cleanup()

start = time.Now()
if err := k.admin.Close(); err != nil {
log.Warn("close kafka cluster admin with error", zap.Error(err),
zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
} else {
log.Info("kafka cluster admin closed", zap.Duration("duration", time.Since(start)),
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

return nil
}

Expand All @@ -263,12 +285,17 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
zap.String("changefeed", k.id), zap.Any("role", k.role))
k.stop()
}()

ticker := time.NewTicker(flushMetricsInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-k.closeCh:
return nil
case <-ticker.C:
k.metricsMonitor.CollectMetrics()
case err := <-k.failpointCh:
log.Warn("receive from failpoint chan", zap.Error(err),
zap.String("changefeed", k.id), zap.Any("role", k.role))
Expand Down Expand Up @@ -312,11 +339,6 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
defer func() {
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
}
}()

if err := validateAndCreateTopic(admin, topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
Expand Down Expand Up @@ -344,6 +366,7 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o
return nil, err
}
k := &kafkaSaramaProducer{
admin: admin,
client: client,
asyncProducer: asyncProducer,
syncProducer: syncProducer,
Expand All @@ -361,6 +384,9 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, o

id: changefeedID,
role: role,

metricsMonitor: newSaramaMetricsMonitor(cfg.MetricRegistry,
util.CaptureAddrFromCtx(ctx), changefeedID, admin),
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
Expand Down
Loading

0 comments on commit 2817747

Please sign in to comment.