diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index b3aa0ad65fed..b1cf70be32f8 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -58,6 +58,7 @@ go_test( "compaction_test.go", "controller_test.go", "flush_test.go", + "job_monitor_test.go", "main_test.go", "reader_test.go", "scheduled_sql_stats_compaction_test.go", @@ -65,6 +66,7 @@ go_test( deps = [ ":persistedsqlstats", "//pkg/base", + "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/jobs/jobstest", diff --git a/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go b/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go new file mode 100644 index 000000000000..9177817c0ae0 --- /dev/null +++ b/pkg/sql/sqlstats/persistedsqlstats/job_monitor_test.go @@ -0,0 +1,63 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package persistedsqlstats_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +func TestVersionGating(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + params, _ := tests.CreateTestServerParams() + params.Knobs.Server = &server.TestingKnobs{ + DisableAutomaticVersionUpgrade: 1, + BinaryVersionOverride: clusterversion.ByKey(clusterversion.SQLStatsCompactionScheduledJob - 1), + } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: params, + }) + + defer tc.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0 /* idx */)) + sqlDB.CheckQueryResults(t, + "SELECT count(*) FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"0"}}) + + sqlDB.Exec(t, `SET CLUSTER SETTING version = $1`, + clusterversion.ByKey(clusterversion.SQLStatsCompactionScheduledJob).String()) + + // Change the recurrence cluster setting to force job monitor to create the + // sql stats compaction schedule. + sqlDB.Exec(t, "SET CLUSTER SETTING sql.stats.cleanup.recurrence = '@daily'") + + // Wait for the change is picked up by the job monitor. + sqlDB.CheckQueryResultsRetry(t, + "SELECT recurrence FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"@daily"}}) + + sqlDB.CheckQueryResults(t, + "SELECT count(*) FROM [SHOW SCHEDULES FOR SQL STATISTICS]", + [][]string{{"1"}}) +} diff --git a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go index 7f766df0a133..4fe59a8a088b 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go +++ b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go @@ -86,7 +86,11 @@ func (j *jobMonitor) start(ctx context.Context, stopper *stop.Stopper) { func (j *jobMonitor) registerClusterSettingHook() { SQLStatsCleanupRecurrence.SetOnChange(&j.st.SV, func(ctx context.Context) { + if !j.isVersionCompatible(ctx) { + return + } if err := j.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + j.ensureSchedule(ctx) sj, err := j.getSchedule(ctx, txn) if err != nil { return err @@ -137,9 +141,9 @@ func (j *jobMonitor) getSchedule( } func (j *jobMonitor) ensureSchedule(ctx context.Context) { - clusterVersion := j.st.Version.ActiveVersionOrEmpty(ctx) - if !clusterVersion.IsActive(clusterversion.SQLStatsCompactionScheduledJob) { - log.Warningf(ctx, "cannot create sql stats scheduled compaction job because current cluster version is too low") + if !j.isVersionCompatible(ctx) { + log.Error(ctx, "cannot create sql stats scheduled compaction job because current cluster version is too low") + return } var sj *jobs.ScheduledJob @@ -168,6 +172,14 @@ func (j *jobMonitor) ensureSchedule(ctx context.Context) { } } +func (j *jobMonitor) isVersionCompatible(ctx context.Context) bool { + clusterVersion := j.st.Version.ActiveVersionOrEmpty(ctx) + if !clusterVersion.IsActive(clusterversion.SQLStatsCompactionScheduledJob) { + return false + } + return true +} + // CheckScheduleAnomaly checks a given schedule to see if it is either paused // or has unusually long run interval. func CheckScheduleAnomaly(sj *jobs.ScheduledJob) error {