Skip to content

Commit

Permalink
metrics(ticdc): fix slowest table metrics in scheduler (pingcap#10484) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored and CharlesCheung96 committed Feb 26, 2024
1 parent da5ba90 commit ae0e285
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 4 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,7 @@ func (c *changefeed) initMetrics() {

// releaseResources is idempotent.
func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.cleanupMetrics()
if c.isReleased {
return
}
Expand Down Expand Up @@ -745,7 +746,6 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
_ = c.downstreamObserver.Close()
}

c.cleanupMetrics()
c.schema = nil
c.barriers = nil
c.initialized = false
Expand Down
8 changes: 6 additions & 2 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ func (r *Manager) AdvanceCheckpoint(

r.slowestPuller = tablepb.Span{}
r.slowestSink = tablepb.Span{}
resolvedTsOfSlowestSink := model.Ts(math.MaxUint64)

watermark = schedulepb.Watermark{
CheckpointTs: math.MaxUint64,
Expand Down Expand Up @@ -625,9 +626,12 @@ func (r *Manager) AdvanceCheckpoint(
}

// Find the minimum checkpoint ts and resolved ts.
if watermark.CheckpointTs > table.Checkpoint.CheckpointTs {
if watermark.CheckpointTs > table.Checkpoint.CheckpointTs ||
(watermark.CheckpointTs == table.Checkpoint.CheckpointTs &&
resolvedTsOfSlowestSink > table.Checkpoint.ResolvedTs) {
watermark.CheckpointTs = table.Checkpoint.CheckpointTs
r.slowestSink = span
resolvedTsOfSlowestSink = table.Checkpoint.ResolvedTs
}
if watermark.ResolvedTs > table.Checkpoint.ResolvedTs {
watermark.ResolvedTs = table.Checkpoint.ResolvedTs
Expand Down Expand Up @@ -854,7 +858,7 @@ func (r *Manager) CollectMetrics() {
Set(float64(counter))
}

if table, ok := r.spans.Get(r.slowestSink); ok {
if table, ok := r.spans.Get(r.slowestPuller); ok {
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))
Expand Down
2 changes: 1 addition & 1 deletion metrics/alertmanager/ticdc.rules.yml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ groups:

# tikv related alter rules
- alert: tikv_cdc_min_resolved_ts_no_change_for_1m
expr: changes(tikv_cdc_min_resolved_ts[1m]) < 1 and ON (instance) tikv_cdc_region_resolve_status{status="resolved"} > 0
expr: changes(tikv_cdc_min_resolved_ts[1m]) < 1 and ON (instance) tikv_cdc_region_resolve_status{status="resolved"} > 0 and ON (instance) tikv_cdc_captured_region_total > 0
for: 1m
labels:
env: ENV_LABELS_ENV
Expand Down

0 comments on commit ae0e285

Please sign in to comment.