Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: add more metrics; refine batch handle #590

Merged
merged 7 commits into from
Apr 10, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions syncer/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,24 @@ import (
)

var (
binlogReadDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "read_binlog_duration",
Help: "bucketed histogram of read time (s) for single binlog event from the relay log or master.",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, []string{"task"})

binlogEventSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "binlog_event_size",
Help: "size of a binlog event",
Buckets: prometheus.ExponentialBuckets(16, 2, 20),
}, []string{"task"})

binlogEvent = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Expand All @@ -37,6 +55,15 @@ var (
Buckets: prometheus.ExponentialBuckets(0.0005, 2, 18),
}, []string{"type", "task"})

conflictDetectDurationHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "conflict_detect_duration",
Help: "bucketed histogram of conflict detect time (s) for single DML statement",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, []string{"task"})

binlogSkippedEventsTotal = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dm",
Expand All @@ -61,6 +88,14 @@ var (
Help: "total number of finished jobs",
}, []string{"type", "task", "queueNo"})

queueSizeGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "syncer",
Name: "queue_size",
Help: "remain size of the DML queue",
}, []string{"task", "queueNo"})

binlogPosGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Expand Down Expand Up @@ -148,10 +183,14 @@ var (

// RegisterMetrics registers metrics
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(binlogReadDurationHistogram)
registry.MustRegister(binlogEventSizeHistogram)
registry.MustRegister(binlogEvent)
registry.MustRegister(conflictDetectDurationHistogram)
registry.MustRegister(binlogSkippedEventsTotal)
registry.MustRegister(addedJobsTotal)
registry.MustRegister(finishedJobsTotal)
registry.MustRegister(queueSizeGauge)
registry.MustRegister(sqlRetriesTotal)
registry.MustRegister(binlogPosGauge)
registry.MustRegister(binlogFileGauge)
Expand Down
17 changes: 13 additions & 4 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,7 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
for {
select {
case sqlJob, ok := <-jobChan:
queueSizeGauge.WithLabelValues(s.cfg.Name, queueBucket).Set(float64(len(jobChan)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

multiple queues share same bucket, it would be problem i under some cases.
It is recommended that both slow query
threshold and bucket count be configurable, and these can be optimized later

if !ok {
return
}
Expand All @@ -1017,16 +1018,14 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
clearF()
}

default:
case <-time.After(waitTime):
if len(jobs) > 0 {
err = executeSQLs()
if err != nil {
fatalF(err, pb.ErrorType_ExecSQL)
continue
}
clearF()
} else {
time.Sleep(waitTime)
}
}
}
Expand Down Expand Up @@ -1213,6 +1212,8 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
e = s.tryInject(latestOp, currentPos)
latestOp = null
}

startTime := time.Now()
if e == nil {
failpoint.Inject("SyncerEventTimeout", func(val failpoint.Value) {
if seconds, ok := val.(int); ok {
Expand All @@ -1225,7 +1226,6 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
cancel()
}

startTime := time.Now()
if err == context.Canceled {
s.tctx.L().Info("binlog replication main routine quit(context canceled)!", zap.Stringer("last position", lastPos))
return nil
Expand Down Expand Up @@ -1266,6 +1266,11 @@ func (s *Syncer) Run(ctx context.Context) (err error) {

return err
}

// time duration for reading an event from relay log or upstream master.
binlogReadDurationHistogram.WithLabelValues(s.cfg.Name).Observe(time.Since(startTime).Seconds())
startTime = time.Now() // reset start time for the next metric.

// get binlog event, reset tryReSync, so we can re-sync binlog while syncer meets errors next time
tryReSync = true
binlogPosGauge.WithLabelValues("syncer", s.cfg.Name).Set(float64(e.Header.LogPos))
Expand All @@ -1276,6 +1281,7 @@ func (s *Syncer) Run(ctx context.Context) (err error) {
binlogFileGauge.WithLabelValues("syncer", s.cfg.Name).Set(float64(index))
}
s.binlogSizeCount.Add(int64(e.Header.EventSize))
binlogEventSizeHistogram.WithLabelValues(s.cfg.Name).Observe(float64(e.Header.EventSize))

failpoint.Inject("ProcessBinlogSlowDown", nil)

Expand Down Expand Up @@ -1937,10 +1943,13 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
}

func (s *Syncer) commitJob(tp opType, sourceSchema, sourceTable, targetSchema, targetTable, sql string, args []interface{}, keys []string, retry bool, pos, cmdPos mysql.Position, gs gtid.Set, traceID string) error {
startTime := time.Now()
key, err := s.resolveCasuality(keys)
if err != nil {
return terror.ErrSyncerUnitResolveCasualityFail.Generate(err)
}
conflictDetectDurationHistogram.WithLabelValues(s.cfg.Name).Observe(time.Since(startTime).Seconds())

job := newJob(tp, sourceSchema, sourceTable, targetSchema, targetTable, sql, args, key, pos, cmdPos, gs, traceID)
return s.addJobFunc(job)
}
Expand Down