Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/WizardXiao/ticdc into mul…
Browse files Browse the repository at this point in the history
…tiple-rows-use-downstream-schema
  • Loading branch information
WizardXiao committed Nov 9, 2021
2 parents b7840fb + 55e8eae commit 285e91c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
6 changes: 3 additions & 3 deletions pkg/actor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ var (
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "actor",
Name: "workers_cpu_seconds_total",
Help: "Total working time spent in seconds.",
}, []string{"name"})
Name: "worker_cpu_seconds_total",
Help: "Total user and system CPU time spent by workers in seconds.",
}, []string{"name", "id"})
batchSizeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down
42 changes: 27 additions & 15 deletions pkg/actor/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"container/list"
"context"
"runtime"
"runtime/pprof"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -324,6 +326,11 @@ func (b *SystemBuilder) handleFatal(
// Build builds a system and a router.
func (b *SystemBuilder) Build() (*System, *Router) {
router := newRouter(b.name)
metricWorkingDurations := make([]prometheus.Counter, b.numWorker)
for i := range metricWorkingDurations {
metricWorkingDurations[i] =
workingDuration.WithLabelValues(b.name, strconv.Itoa(i))
}
return &System{
name: b.name,
numWorker: b.numWorker,
Expand All @@ -335,12 +342,12 @@ func (b *SystemBuilder) Build() (*System, *Router) {

fatalHandler: b.fatalHandler,

metricTotalWorkers: totalWorkers.WithLabelValues(b.name),
metricWorkingWorkers: workingWorkers.WithLabelValues(b.name),
metricWorkingDuration: workingDuration.WithLabelValues(b.name),
metricPollDuration: pollActorDuration.WithLabelValues(b.name),
metricProcBatch: batchSizeHistogram.WithLabelValues(b.name, "proc"),
metricMsgBatch: batchSizeHistogram.WithLabelValues(b.name, "msg"),
metricTotalWorkers: totalWorkers.WithLabelValues(b.name),
metricWorkingWorkers: workingWorkers.WithLabelValues(b.name),
metricWorkingDurations: metricWorkingDurations,
metricPollDuration: pollActorDuration.WithLabelValues(b.name),
metricProcBatch: batchSizeHistogram.WithLabelValues(b.name, "proc"),
metricMsgBatch: batchSizeHistogram.WithLabelValues(b.name, "msg"),
}, router
}

Expand All @@ -359,12 +366,12 @@ type System struct {
fatalHandler func(string, ID)

// Metrics
metricTotalWorkers prometheus.Gauge
metricWorkingWorkers prometheus.Gauge
metricWorkingDuration prometheus.Counter
metricPollDuration prometheus.Observer
metricProcBatch prometheus.Observer
metricMsgBatch prometheus.Observer
metricTotalWorkers prometheus.Gauge
metricWorkingWorkers prometheus.Gauge
metricWorkingDurations []prometheus.Counter
metricPollDuration prometheus.Observer
metricProcBatch prometheus.Observer
metricMsgBatch prometheus.Observer
}

// Start the system. Cancelling the context to stop the system.
Expand All @@ -375,8 +382,13 @@ func (s *System) Start(ctx context.Context) {

s.metricTotalWorkers.Add(float64(s.numWorker))
for i := 0; i < s.numWorker; i++ {
id := i
s.wg.Go(func() error {
s.poll(ctx)
defer pprof.SetGoroutineLabels(ctx)
pctx := pprof.WithLabels(ctx, pprof.Labels("actor", s.name))
pprof.SetGoroutineLabels(pctx)

s.poll(pctx, id)
return nil
})
}
Expand Down Expand Up @@ -404,7 +416,7 @@ func (s *System) Spawn(mb Mailbox, actor Actor) error {
const slowReceiveThreshold = time.Second

// The main poll of actor system.
func (s *System) poll(ctx context.Context) {
func (s *System) poll(ctx context.Context, id int) {
batchPBuf := make([]*proc, s.actorBatchSize)
batchMsgBuf := make([]message.Message, s.msgBatchSizePerActor)
rd := s.rd
Expand All @@ -427,7 +439,7 @@ func (s *System) poll(ctx context.Context) {
break
}
// Recording metrics.
s.metricWorkingDuration.Add(time.Since(startTime).Seconds())
s.metricWorkingDurations[id].Add(time.Since(startTime).Seconds())
s.metricWorkingWorkers.Dec()
// Park the poll until it is awakened.
rd.cond.Wait()
Expand Down

0 comments on commit 285e91c

Please sign in to comment.