Skip to content

Commit

Permalink
changefeedccl: add metrics for paused changefeed jobs
Browse files Browse the repository at this point in the history
Paused changefeed jobs will now show up as a counter in the
debug UI. This counter will also be added to telemetry.

Resolves: #85467
Release note: None
  • Loading branch information
jayshrivastava committed Oct 17, 2022
1 parent d6c28f5 commit 1706650
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 5 deletions.
45 changes: 45 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,51 @@ func TestChangefeedIdleness(t *testing.T) {
}, feedTestEnterpriseSinks)
}

func TestChangefeedPausedMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
registry := s.Server.JobRegistry().(*jobs.Registry)
currentlyPaused := registry.MetricsStruct().JobMetrics[jobspb.TypeChangefeed].CurrentlyPaused
waitForPausedCount := func(numIdle int64) {
testutils.SucceedsSoon(t, func() error {
if currentlyPaused.Value() != numIdle {
return fmt.Errorf("expected (%+v) paused changefeeds, found (%+v)", numIdle, currentlyPaused.Value())
}
return nil
})
}

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`)
cf1 := feed(t, f, "CREATE CHANGEFEED FOR TABLE foo")
cf2 := feed(t, f, "CREATE CHANGEFEED FOR TABLE bar")
sqlDB.Exec(t, `INSERT INTO foo VALUES (0)`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (0)`)
waitForPausedCount(0)

jobFeed1 := cf1.(cdctest.EnterpriseTestFeed)
jobFeed2 := cf2.(cdctest.EnterpriseTestFeed)

require.NoError(t, jobFeed1.Pause())
waitForPausedCount(1)

require.NoError(t, jobFeed1.Resume())
waitForPausedCount(0)

require.NoError(t, jobFeed1.Pause())
require.NoError(t, jobFeed2.Pause())
waitForPausedCount(2)

closeFeed(t, cf1)
waitForPausedCount(1)
closeFeed(t, cf2)
waitForPausedCount(0)
}, feedTestEnterpriseSinks)
}

// TestChangefeedSendError validates that SendErrors do not fail the changefeed
// as they can occur in normal situations such as a cluster update
func TestChangefeedSendError(t *testing.T) {
Expand Down
23 changes: 20 additions & 3 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,11 +437,15 @@ const pauseAndCancelUpdate = `
claim_instance_id = NULL
WHERE (status IN ('` + string(StatusPauseRequested) + `', '` + string(StatusCancelRequested) + `'))
AND ((claim_session_id = $1) AND (claim_instance_id = $2))
RETURNING id, status
RETURNING id, status, payload
`

func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqlliveness.Session) error {
return r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
metricUpdates := make(map[jobspb.Type]int)
err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// In case of transaction retries, reset this map here.
metricUpdates = make(map[jobspb.Type]int)

// Run the claim transaction at low priority to ensure that it does not
// contend with foreground reads.
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
Expand All @@ -464,8 +468,15 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
statusString := *row[1].(*tree.DString)
switch Status(statusString) {
case StatusPaused:
payload, err := UnmarshalPayload(row[2])
if err != nil {
return err
}
if _, ok := metricUpdates[payload.Type()]; !ok {
metricUpdates[payload.Type()] = 0
}
metricUpdates[payload.Type()]++
r.cancelRegisteredJobContext(id)
log.Infof(ctx, "job %d, session %s: paused", id, s.ID())
case StatusReverting:
if err := job.Update(ctx, txn, func(txn *kv.Txn, md JobMetadata, ju *JobUpdater) error {
r.cancelRegisteredJobContext(id)
Expand All @@ -489,4 +500,10 @@ func (r *Registry) servePauseAndCancelRequests(ctx context.Context, s sqllivenes
}
return nil
})
if err == nil {
for typ, count := range metricUpdates {
r.metrics.JobMetrics[typ].CurrentlyPaused.Inc(int64(count))
}
}
return err
}
13 changes: 13 additions & 0 deletions pkg/jobs/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Metrics struct {
type JobTypeMetrics struct {
CurrentlyRunning *metric.Gauge
CurrentlyIdle *metric.Gauge
CurrentlyPaused *metric.Gauge
ResumeCompleted *metric.Counter
ResumeRetryError *metric.Counter
ResumeFailed *metric.Counter
Expand Down Expand Up @@ -82,6 +83,17 @@ func makeMetaCurrentlyIdle(typeStr string) metric.Metadata {
}
}

func makeMetaCurrentlyPaused(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.currently_paused", typeStr),
Help: fmt.Sprintf("Number of %s jobs currently considered Paused",
typeStr),
Measurement: "jobs",
Unit: metric.Unit_COUNT,
MetricType: io_prometheus_client.MetricType_GAUGE,
}
}

func makeMetaResumeCompeted(typeStr string) metric.Metadata {
return metric.Metadata{
Name: fmt.Sprintf("jobs.%s.resume_completed", typeStr),
Expand Down Expand Up @@ -214,6 +226,7 @@ func (m *Metrics) init(histogramWindowInterval time.Duration) {
m.JobMetrics[jt] = &JobTypeMetrics{
CurrentlyRunning: metric.NewGauge(makeMetaCurrentlyRunning(typeStr)),
CurrentlyIdle: metric.NewGauge(makeMetaCurrentlyIdle(typeStr)),
CurrentlyPaused: metric.NewGauge(makeMetaCurrentlyPaused(typeStr)),
ResumeCompleted: metric.NewCounter(makeMetaResumeCompeted(typeStr)),
ResumeRetryError: metric.NewCounter(makeMetaResumeRetryError(typeStr)),
ResumeFailed: metric.NewCounter(makeMetaResumeFailed(typeStr)),
Expand Down
19 changes: 17 additions & 2 deletions pkg/jobs/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ func UpdateHighwaterProgressed(highWater hlc.Timestamp, md JobMetadata, ju *JobU
// if md.Status != StatusRunning {
// return errors.New("job no longer running")
// }
// md.UpdateStatus(StatusPaused)
// ju.UpdateStatus(StatusPaused)
// // <modify md.Payload>
// md.UpdatePayload(md.Payload)
// ju.UpdatePayload(md.Payload)
// }
//
// Note that there are various convenience wrappers (like FractionProgressed)
Expand All @@ -149,7 +149,10 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF
var status Status
var runStats *RunStats

metricUpdates := make(map[jobspb.Type]int)

if err := j.runInTxn(ctx, txn, func(ctx context.Context, txn *kv.Txn) error {
metricUpdates = make(map[jobspb.Type]int)
payload, progress, runStats = nil, nil, nil
var err error
var row tree.Datums
Expand All @@ -174,6 +177,7 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF
if progress, err = UnmarshalProgress(row[2]); err != nil {
return err
}
typ := payload.Type()
if j.session != nil {
if row[3] == tree.DNull {
return errors.Errorf(
Expand Down Expand Up @@ -231,6 +235,13 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF
return nil
}

if ju.md.Status != StatusPaused && md.Status == StatusPaused {
if _, found := metricUpdates[typ]; !found {
metricUpdates[typ] = 0
}
metricUpdates[typ]++
}

// Build a statement of the following form, depending on which properties
// need updating:
//
Expand Down Expand Up @@ -294,6 +305,10 @@ func (j *Job) update(ctx context.Context, txn *kv.Txn, useReadLock bool, updateF
return nil
}); err != nil {
return errors.Wrapf(err, "job %d", j.id)
} else {
for typ, count := range metricUpdates {
j.registry.metrics.JobMetrics[typ].CurrentlyPaused.Dec(int64(count))
}
}
func() {
j.mu.Lock()
Expand Down

0 comments on commit 1706650

Please sign in to comment.