diff --git a/cdc/sinkv2/eventsink/txn/txn_sink.go b/cdc/sinkv2/eventsink/txn/txn_sink.go index 7e02496056d..8a440acd801 100644 --- a/cdc/sinkv2/eventsink/txn/txn_sink.go +++ b/cdc/sinkv2/eventsink/txn/txn_sink.go @@ -113,7 +113,7 @@ func newSink(ctx context.Context, backends []backend, for i, backend := range backends { w := newWorker(ctx1, i, backend, len(backends)) txnCh := sink.alive.conflictDetector.GetOutChByCacheID(int64(i)) - g.Go(func() error { return w.runLoop(txnCh) }) + g.Go(func() error { return w.run(txnCh) }) sink.workers = append(sink.workers, w) } diff --git a/cdc/sinkv2/eventsink/txn/worker.go b/cdc/sinkv2/eventsink/txn/worker.go index 24462beee9f..c387cec835f 100644 --- a/cdc/sinkv2/eventsink/txn/worker.go +++ b/cdc/sinkv2/eventsink/txn/worker.go @@ -39,7 +39,7 @@ type worker struct { metricConflictDetectDuration prometheus.Observer metricQueueDuration prometheus.Observer metricTxnWorkerFlushDuration prometheus.Observer - metricTxnWorkerBusyRatio prometheus.Counter + metricTxnWorkerTotalDuration prometheus.Observer metricTxnWorkerHandledRows prometheus.Counter // Fields only used in the background loop. @@ -61,8 +61,8 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricQueueDuration: txn.QueueDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID), - metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID), + metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid), + metricTxnWorkerTotalDuration: txn.WorkerTotalDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid), metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid), flushInterval: backend.MaxFlushInterval(), @@ -72,7 +72,7 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w } // Run a loop. -func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { +func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error { defer func() { if err := w.backend.Close(); err != nil { log.Info("Transaction dmlSink backend close fail", @@ -85,14 +85,7 @@ func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) erro zap.String("changefeedID", w.changefeed), zap.Int("workerID", w.ID)) - ticker := time.NewTicker(w.flushInterval) - defer ticker.Stop() - - needFlush := false - var flushTimeSlice, totalTimeSlice time.Duration - overseerTicker := time.NewTicker(time.Second) - defer overseerTicker.Stop() - startToWork := time.Now() + start := time.Now() for { select { case <-w.ctx.Done(): @@ -101,27 +94,43 @@ func (w *worker) runLoop(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) erro zap.Int("workerID", w.ID)) return nil case txn := <-txnCh: + // we get the data from txnCh.out until no more data here or reach the state that can be flushed. + // If no more data in txnCh.out, and also not reach the state that can be flushed, + // we will wait for 10ms and then do flush to avoid too much flush with small amount of txns. if txn.TxnEvent != nil { - needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted) - } - case <-ticker.C: - needFlush = true - case now := <-overseerTicker.C: - totalTimeSlice = now.Sub(startToWork) - busyRatio := int(flushTimeSlice.Seconds() / totalTimeSlice.Seconds() * 1000) - w.metricTxnWorkerBusyRatio.Add(float64(busyRatio) / float64(w.workerCount)) - startToWork = now - flushTimeSlice = 0 - } - if needFlush { - if err := w.doFlush(&flushTimeSlice); err != nil { - log.Error("Transaction dmlSink worker exits unexpectly", - zap.String("changefeedID", w.changefeed), - zap.Int("workerID", w.ID), - zap.Error(err)) - return err + needFlush := w.onEvent(txn.TxnEvent, txn.PostTxnExecuted) + if !needFlush { + delay := time.NewTimer(w.flushInterval) + for !needFlush { + select { + case txn := <-txnCh: + needFlush = w.onEvent(txn.TxnEvent, txn.PostTxnExecuted) + case <-delay.C: + needFlush = true + } + } + // Release resources promptly + if !delay.Stop() { + select { + case <-delay.C: + default: + } + } + } + // needFlush must be true here, so we can do flush. + if err := w.doFlush(); err != nil { + log.Error("Transaction dmlSink worker exits unexpectly", + zap.String("changefeedID", w.changefeed), + zap.Int("workerID", w.ID), + zap.Error(err)) + return err + } + // we record total time to calcuate the worker busy ratio. + // so we record the total time after flushing, to unified statistics on + // flush time and total time + w.metricTxnWorkerTotalDuration.Observe(time.Since(start).Seconds()) + start = time.Now() } - needFlush = false } } } @@ -148,16 +157,12 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool { } // doFlush flushes the backend. -// It returns true only if it can no longer be flushed. -func (w *worker) doFlush(flushTimeSlice *time.Duration) error { +func (w *worker) doFlush() error { if w.hasPending { start := time.Now() defer func() { - elapsed := time.Since(start) - *flushTimeSlice += elapsed - w.metricTxnWorkerFlushDuration.Observe(elapsed.Seconds()) + w.metricTxnWorkerFlushDuration.Observe(time.Since(start).Seconds()) }() - if err := w.backend.Flush(w.ctx); err != nil { log.Warn("Transaction sink backend flush fail", zap.String("changefeedID", w.changefeed), diff --git a/cdc/sinkv2/metrics/txn/metrics.go b/cdc/sinkv2/metrics/txn/metrics.go index 94c37b3eb3e..e752c39e625 100644 --- a/cdc/sinkv2/metrics/txn/metrics.go +++ b/cdc/sinkv2/metrics/txn/metrics.go @@ -44,15 +44,16 @@ var ( Name: "txn_worker_flush_duration", Help: "Flush duration (s) for txn worker.", Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~524s - }, []string{"namespace", "changefeed"}) + }, []string{"namespace", "changefeed", "id"}) - WorkerBusyRatio = prometheus.NewCounterVec( - prometheus.CounterOpts{ + WorkerTotalDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ Namespace: "ticdc", Subsystem: "sinkv2", - Name: "txn_worker_busy_ratio", - Help: "Busy ratio (X ms in 1s) for all workers.", - }, []string{"namespace", "changefeed"}) + Name: "txn_worker_total_duration", + Help: "total duration (s) for txn worker.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 20), // 1ms~524s + }, []string{"namespace", "changefeed", "id"}) WorkerHandledRows = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -86,7 +87,7 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(ConflictDetectDuration) registry.MustRegister(QueueDuration) registry.MustRegister(WorkerFlushDuration) - registry.MustRegister(WorkerBusyRatio) + registry.MustRegister(WorkerTotalDuration) registry.MustRegister(WorkerHandledRows) registry.MustRegister(SinkDMLBatchCommit) registry.MustRegister(SinkDMLBatchCallback) diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 1406df39049..e27d56f8151 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -7481,9 +7481,9 @@ "targets": [ { "exemplar": true, - "expr": "sum(rate(ticdc_sinkv2_txn_worker_busy_ratio{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])/10) by (changefeed,instance)", + "expr": "sum(rate(ticdc_sinkv2_txn_worker_flush_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) /sum(rate(ticdc_sink_txn_worker_total_duration_sum{k8s_cluster=\"$k8s_cluster\",tidb_cluster=\"$tidb_cluster\",namespace=~\"$namespace\",changefeed=~\"$changefeed\",instance=~\"$ticdc_instance\"}[1m])) by (namespace,changefeed,instance,id) *100", "interval": "", - "legendFormat": "{{changefeed}}-{{instance}}", + "legendFormat": "{{namespace}}-{{changefeed}}-{{instance}}-worker-{{id}}", "queryType": "randomWalk", "refId": "A" }