Skip to content

Commit

Permalink
sql, sqlstats: update sql stats compaction job async
Browse files Browse the repository at this point in the history
Closes #85582

Currently, during sql stats compaction job scheduling, the schedule is
retrieved by  querying from the `system.scheduled_jobs` table. The function
handling scheduling is called synchronously during node startup to ensure the
first schedule, and can thus block node startup if the query experiences
contention.

This commit changes the scheduling setup to be called ascynchronously using a
channel notification to ensure the stats collector does not block node startup.
The callback for changing the cluster setting  `SQLStatsCleanupRecurrence` is
also modified to match this behaviour, now issuing a channel notification to
update the schedule to the new value.

Release justification: bug fix, low risk update to existing functionality

Release note: None
  • Loading branch information
xinhaoz committed Aug 28, 2022
1 parent 5cc1afa commit 6ae1b03
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 47 deletions.
1 change: 1 addition & 0 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
103 changes: 58 additions & 45 deletions pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand All @@ -31,7 +32,7 @@ import (
// We don't need this monitor to run very frequent. Normally, the schedule
// should remain in the system table once it is created. However, some operations
// such as RESTORE would wipe the system table and populate it with the data
// from BAKCUP. In this case, it would be nice for us to preemptively check
// from BACKUP. In this case, it would be nice for us to preemptively check
// for the abnormal state of the schedule and restore it.
var defaultScanInterval = time.Hour * 6

Expand All @@ -56,7 +57,7 @@ var longIntervalWarningThreshold = time.Hour * 24

// jobMonitor monitors the system.scheduled_jobs table to ensure that we would
// always have one sql stats scheduled compaction job running.
// It immediately performs this check upon start() and runs the check
// It performs this check immediately upon start() and runs the check
// periodically every scanInterval (subject to jittering).
type jobMonitor struct {
st *cluster.Settings
Expand All @@ -67,43 +68,36 @@ type jobMonitor struct {
}

func (j *jobMonitor) start(ctx context.Context, stopper *stop.Stopper) {
j.ensureSchedule(ctx)
j.registerClusterSettingHook()

_ = stopper.RunAsyncTask(ctx, "sql-stats-scheduled-compaction-job-monitor", func(ctx context.Context) {
nextJobScheduleCheck := timeutil.Now()
currentRecurrence := SQLStatsCleanupRecurrence.Get(&j.st.SV)

stopCtx, cancel := stopper.WithCancelOnQuiesce(ctx)
defer cancel()

timer := timeutil.NewTimer()
// Ensure schedule at startup.
timer.Reset(0)
defer timer.Stop()

// This loop runs every minute to check if we need to update the job schedule.
// We only hit the jobs table if the schedule needs to be updated due to a
// change in the recurrence cluster setting or as a scheduled check to
// ensure the schedule exists, which defaults to every 6 hours.
for {
timer.Reset(j.jitterFn(j.scanInterval))
select {
case <-timer.C:
timer.Read = true
case <-stopper.ShouldQuiesce():
case <-stopCtx.Done():
return
}
j.ensureSchedule(ctx)
}
})
}

func (j *jobMonitor) registerClusterSettingHook() {
SQLStatsCleanupRecurrence.SetOnChange(&j.st.SV, func(ctx context.Context) {
j.ensureSchedule(ctx)
if err := j.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
sj, err := j.getSchedule(ctx, txn)
if err != nil {
return err
if SQLStatsCleanupRecurrence.Get(&j.st.SV) != currentRecurrence || nextJobScheduleCheck.Before(timeutil.Now()) {
j.updateSchedule(stopCtx, j.ie)
nextJobScheduleCheck = timeutil.Now().Add(j.jitterFn(j.scanInterval))
currentRecurrence = SQLStatsCleanupRecurrence.Get(&j.st.SV)
}
cronExpr := SQLStatsCleanupRecurrence.Get(&j.st.SV)
if err = sj.SetSchedule(cronExpr); err != nil {
return err
}
if err = CheckScheduleAnomaly(sj); err != nil {
log.Warningf(ctx, "schedule anomaly detected, disabled sql stats compaction may cause performance impact: %s", err)
}
return sj.Update(ctx, j.ie, txn)
}); err != nil {
log.Errorf(ctx, "unable to find sqlstats clean up schedule: %s", err)

timer.Reset(time.Minute)
}
})
}
Expand Down Expand Up @@ -139,31 +133,50 @@ func (j *jobMonitor) getSchedule(
return sj, nil
}

func (j *jobMonitor) ensureSchedule(ctx context.Context) {
func (j *jobMonitor) updateSchedule(ctx context.Context, ie sqlutil.InternalExecutor) {
var sj *jobs.ScheduledJob
var err error
if err = j.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// We check if we can get load the schedule, if the schedule cannot be
// loaded because it's not found, we recreate the schedule.
sj, err = j.getSchedule(ctx, txn)
if err != nil {
if !jobs.HasScheduledJobNotFoundError(err) && !errors.Is(err, errScheduleNotFound) {
return err
}
sj, err = CreateSQLStatsCompactionScheduleIfNotYetExist(ctx, j.ie, txn, j.st)
retryOptions := retry.Options{
InitialBackoff: time.Second,
MaxBackoff: 10 * time.Minute,
}
for r := retry.StartWithCtx(ctx, retryOptions); r.Next(); {
if err = j.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// We check if we can get load the schedule, if the schedule cannot be
// loaded because it's not found, we recreate the schedule.
sj, err = j.getSchedule(ctx, txn)
if err != nil {
if !jobs.HasScheduledJobNotFoundError(err) && !errors.Is(err, errScheduleNotFound) {
return err
}
sj, err = CreateSQLStatsCompactionScheduleIfNotYetExist(ctx, j.ie, txn, j.st)
if err != nil {
return err
}
}
// Update schedule with new recurrence, if different.
cronExpr := SQLStatsCleanupRecurrence.Get(&j.st.SV)
if sj.ScheduleExpr() == cronExpr {
return nil
}
if err := sj.SetSchedule(cronExpr); err != nil {
return err
}
sj.SetScheduleStatus(string(jobs.StatusPending))
return sj.Update(ctx, ie, txn)
}); err != nil && ctx.Err() == nil {
log.Errorf(ctx, "failed to update stats scheduled compaction job: %s", err)
} else {
break
}
return nil
}); err != nil {
log.Errorf(ctx, "fail to ensure sql stats scheduled compaction job is created: %s", err)
return
}

if err = CheckScheduleAnomaly(sj); err != nil {
log.Warningf(ctx, "schedule anomaly detected: %s", err)
if ctx.Err() == nil {
if err = CheckScheduleAnomaly(sj); err != nil {
log.Warningf(ctx, "schedule anomaly detected, disabling sql stats compaction may cause performance impact: %s", err)
}
}

}

// CheckScheduleAnomaly checks a given schedule to see if it is either paused
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -177,9 +178,11 @@ func TestScheduledSQLStatsCompaction(t *testing.T) {
func TestSQLStatsScheduleOperations(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.UnderStressRace(t, "test is too slow to run under race")

ctx := context.Background()
helper, helperCleanup := newTestHelper(t, nil /* sqlStatsKnobs */)
helper.sqlDB.SucceedsSoonDuration = 2 * time.Minute
defer helperCleanup()

schedID := getSQLStatsCompactionSchedule(t, helper).ScheduleID()
Expand Down Expand Up @@ -215,7 +218,7 @@ func TestSQLStatsScheduleOperations(t *testing.T) {
helper.sqlDB.Exec(t, "SET CLUSTER SETTING sql.stats.cleanup.recurrence = $1", expr)

var err error
testutils.SucceedsSoon(t, func() error {
testutils.SucceedsWithin(t, func() error {
// Reload schedule from DB.
sj := getSQLStatsCompactionSchedule(t, helper)
err = persistedsqlstats.CheckScheduleAnomaly(sj)
Expand All @@ -224,7 +227,8 @@ func TestSQLStatsScheduleOperations(t *testing.T) {
}
require.Equal(t, expr, sj.ScheduleExpr())
return nil
})
}, time.Minute*2)

require.True(t, errors.Is(
errors.Unwrap(err), persistedsqlstats.ErrScheduleIntervalTooLong),
"expected ErrScheduleIntervalTooLong, but found %+v", err)
Expand Down

0 comments on commit 6ae1b03

Please sign in to comment.