Skip to content

Commit

Permalink
metrics(ticdc): add resolved ts and add changefeed to dataflow (#4038) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 30, 2021
1 parent 5a6fc5c commit 981aaf5
Show file tree
Hide file tree
Showing 3 changed files with 2,825 additions and 2,946 deletions.
21 changes: 18 additions & 3 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type changefeed struct {

metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func(ctx cdcContext.Context) (AsyncSink, error)
Expand Down Expand Up @@ -258,6 +260,9 @@ LOOP:
// init metrics
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.WithLabelValues(c.id)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id)

c.initialized = true
return nil
}
Expand All @@ -279,10 +284,17 @@ func (c *changefeed) releaseResources() {
log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
}
c.wg.Wait()

changefeedCheckpointTsGauge.DeleteLabelValues(c.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id)
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil

changefeedResolvedTsGauge.DeleteLabelValues(c.id)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil

c.initialized = false
}

Expand Down Expand Up @@ -461,10 +473,13 @@ func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
}
return status, changed, nil
})
phyTs := oracle.ExtractPhysical(checkpointTs)
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyCkpTs) / 1e3)

c.metricsChangefeedCheckpointTsGauge.Set(float64(phyTs))
c.metricsChangefeedCheckpointTsLagGauge.Set(float64(currentTs-phyTs) / 1e3)
phyRTs := oracle.ExtractPhysical(resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))
c.metricsChangefeedResolvedTsLagGauge.Set(float64(currentTs-phyRTs) / 1e3)
}

func (c *changefeed) Close() {
Expand Down
18 changes: 17 additions & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,21 @@ var (
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"changefeed"})
changefeedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts",
Help: "resolved ts of changefeeds",
}, []string{"changefeed"})
changefeedResolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts_lag",
Help: "resolved ts lag of changefeeds in seconds",
}, []string{"changefeed"})
ownershipCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Expand Down Expand Up @@ -63,7 +77,9 @@ const (
// InitMetrics registers all metrics used in owner
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(changefeedCheckpointTsGauge)
registry.MustRegister(changefeedResolvedTsGauge)
registry.MustRegister(changefeedCheckpointTsLagGauge)
registry.MustRegister(changefeedResolvedTsLagGauge)
registry.MustRegister(ownershipCounter)
registry.MustRegister(ownerMaintainTableNumGauge)
registry.MustRegister(changefeedStatusGauge)
Expand Down
Loading

0 comments on commit 981aaf5

Please sign in to comment.