Skip to content

Commit

Permalink
metrics(ticdc): fix slowest table metrics in scheduler (#10484)
Browse files Browse the repository at this point in the history
close #10482
  • Loading branch information
CharlesCheung96 committed Jan 23, 2024
1 parent 62d1d01 commit 8f00d7a
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ func (r *Manager) AdvanceCheckpoint(

r.slowestPuller = model.TableID(0)
r.slowestSink = model.TableID(0)
resolvedTsOfSlowestSink := model.Ts(math.MaxUint64)

watermark = schedulepb.Watermark{
CheckpointTs: math.MaxUint64,
Expand Down Expand Up @@ -607,9 +608,12 @@ func (r *Manager) AdvanceCheckpoint(
}
}
// Find the minimum checkpoint ts and resolved ts.
if watermark.CheckpointTs > table.Checkpoint.CheckpointTs {
if watermark.CheckpointTs > table.Checkpoint.CheckpointTs ||
(watermark.CheckpointTs == table.Checkpoint.CheckpointTs &&
resolvedTsOfSlowestSink > table.Checkpoint.ResolvedTs) {
watermark.CheckpointTs = table.Checkpoint.CheckpointTs
r.slowestSink = tableID
resolvedTsOfSlowestSink = table.Checkpoint.ResolvedTs
}
if watermark.ResolvedTs > table.Checkpoint.ResolvedTs {
watermark.ResolvedTs = table.Checkpoint.ResolvedTs
Expand Down Expand Up @@ -800,7 +804,7 @@ func (r *Manager) CollectMetrics() {
Set(float64(counter))
}

if table, ok := r.tables[r.slowestSink]; ok {
if table, ok := r.tables[r.slowestPuller]; ok {
if pullerCkpt, ok := table.Stats.StageCheckpoints["puller-egress"]; ok {
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))
Expand Down

0 comments on commit 8f00d7a

Please sign in to comment.