diff --git a/pkg/actor/metrics.go b/pkg/actor/metrics.go index a2fbd3534e6..ea5f4f535c1 100644 --- a/pkg/actor/metrics.go +++ b/pkg/actor/metrics.go @@ -36,9 +36,9 @@ var ( prometheus.CounterOpts{ Namespace: "ticdc", Subsystem: "actor", - Name: "workers_cpu_seconds_total", - Help: "Total working time spent in seconds.", - }, []string{"name"}) + Name: "worker_cpu_seconds_total", + Help: "Total user and system CPU time spent by workers in seconds.", + }, []string{"name", "id"}) batchSizeHistogram = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: "ticdc", diff --git a/pkg/actor/system.go b/pkg/actor/system.go index ef5dfe118a4..808eff4b1ab 100644 --- a/pkg/actor/system.go +++ b/pkg/actor/system.go @@ -17,6 +17,8 @@ import ( "container/list" "context" "runtime" + "runtime/pprof" + "strconv" "sync" "sync/atomic" "time" @@ -324,6 +326,11 @@ func (b *SystemBuilder) handleFatal( // Build builds a system and a router. func (b *SystemBuilder) Build() (*System, *Router) { router := newRouter(b.name) + metricWorkingDurations := make([]prometheus.Counter, b.numWorker) + for i := range metricWorkingDurations { + metricWorkingDurations[i] = + workingDuration.WithLabelValues(b.name, strconv.Itoa(i)) + } return &System{ name: b.name, numWorker: b.numWorker, @@ -335,12 +342,12 @@ func (b *SystemBuilder) Build() (*System, *Router) { fatalHandler: b.fatalHandler, - metricTotalWorkers: totalWorkers.WithLabelValues(b.name), - metricWorkingWorkers: workingWorkers.WithLabelValues(b.name), - metricWorkingDuration: workingDuration.WithLabelValues(b.name), - metricPollDuration: pollActorDuration.WithLabelValues(b.name), - metricProcBatch: batchSizeHistogram.WithLabelValues(b.name, "proc"), - metricMsgBatch: batchSizeHistogram.WithLabelValues(b.name, "msg"), + metricTotalWorkers: totalWorkers.WithLabelValues(b.name), + metricWorkingWorkers: workingWorkers.WithLabelValues(b.name), + metricWorkingDurations: metricWorkingDurations, + metricPollDuration: pollActorDuration.WithLabelValues(b.name), + metricProcBatch: batchSizeHistogram.WithLabelValues(b.name, "proc"), + metricMsgBatch: batchSizeHistogram.WithLabelValues(b.name, "msg"), }, router } @@ -359,12 +366,12 @@ type System struct { fatalHandler func(string, ID) // Metrics - metricTotalWorkers prometheus.Gauge - metricWorkingWorkers prometheus.Gauge - metricWorkingDuration prometheus.Counter - metricPollDuration prometheus.Observer - metricProcBatch prometheus.Observer - metricMsgBatch prometheus.Observer + metricTotalWorkers prometheus.Gauge + metricWorkingWorkers prometheus.Gauge + metricWorkingDurations []prometheus.Counter + metricPollDuration prometheus.Observer + metricProcBatch prometheus.Observer + metricMsgBatch prometheus.Observer } // Start the system. Cancelling the context to stop the system. @@ -375,8 +382,13 @@ func (s *System) Start(ctx context.Context) { s.metricTotalWorkers.Add(float64(s.numWorker)) for i := 0; i < s.numWorker; i++ { + id := i s.wg.Go(func() error { - s.poll(ctx) + defer pprof.SetGoroutineLabels(ctx) + pctx := pprof.WithLabels(ctx, pprof.Labels("actor", s.name)) + pprof.SetGoroutineLabels(pctx) + + s.poll(pctx, id) return nil }) } @@ -404,7 +416,7 @@ func (s *System) Spawn(mb Mailbox, actor Actor) error { const slowReceiveThreshold = time.Second // The main poll of actor system. -func (s *System) poll(ctx context.Context) { +func (s *System) poll(ctx context.Context, id int) { batchPBuf := make([]*proc, s.actorBatchSize) batchMsgBuf := make([]message.Message, s.msgBatchSizePerActor) rd := s.rd @@ -427,7 +439,7 @@ func (s *System) poll(ctx context.Context) { break } // Recording metrics. - s.metricWorkingDuration.Add(time.Since(startTime).Seconds()) + s.metricWorkingDurations[id].Add(time.Since(startTime).Seconds()) s.metricWorkingWorkers.Dec() // Park the poll until it is awakened. rd.cond.Wait()