From ac8fac9c757736d928e425c49db29e8bae53606c Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 24 Mar 2022 21:21:43 -0400 Subject: [PATCH 1/2] sql: Do not check existence of stats compaction job. Scheduled jobs system, by default, ensures that there is only one instance of the job that is currently executing for the schedule. As such, it is not necessary to verify the compaction job does not exist when starting stats compaction job from schedule. Furthermore, due to the interaction of scheduling system, such checks results in wide system.jobs table scan, which causes scheduled job execution to be restarted if any other job modifies system.jobs table. Fixes #78465 Release Notes (sql): Stats compaction scheduled job no longer cause intent buildup. Release Justification: important stability fix to ensure jobs and scheduled jobs do not lock up when running stats compaction job. --- pkg/server/BUILD.bazel | 2 + pkg/sql/compact_sql_stats.go | 11 +- .../sqlstats/persistedsqlstats/BUILD.bazel | 3 - .../compaction_scheduling.go | 36 +----- .../persistedsqlstats/compaction_test.go | 111 ------------------ .../sqlstats/persistedsqlstats/provider.go | 5 - 6 files changed, 7 insertions(+), 161 deletions(-) diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 71e41737ac27..e454b42f5f10 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "doc.go", "drain.go", "grpc_server.go", + "import_ts.go", "index_usage_stats.go", "init.go", "init_handshake.go", @@ -404,6 +405,7 @@ go_test( "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//status", "@org_golang_x_crypto//bcrypt", ], diff --git a/pkg/sql/compact_sql_stats.go b/pkg/sql/compact_sql_stats.go index e7c28c452263..e011ac15ec4f 100644 --- a/pkg/sql/compact_sql_stats.go +++ b/pkg/sql/compact_sql_stats.go @@ -68,15 +68,6 @@ func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interfac return err } - // We check for concurrently running SQL Stats compaction jobs. We only allow - // one job to be running at the same time. - if err := persistedsqlstats.CheckExistingCompactionJob(ctx, r.job, ie, nil /* txn */); err != nil { - if errors.Is(err, persistedsqlstats.ErrConcurrentSQLStatsCompaction) { - log.Infof(ctx, "exiting due to a running sql stats compaction job") - } - return err - } - statsCompactor := persistedsqlstats.NewStatsCompactor( r.st, ie, @@ -202,7 +193,7 @@ func (e *scheduledSQLStatsCompactionExecutor) createSQLStatsCompactionJob( persistedsqlstats.CreateCompactionJob(ctx, &jobs.CreatedByInfo{ ID: sj.ScheduleID(), Name: jobs.CreatedByScheduledJobs, - }, txn, cfg.InternalExecutor, p.(*planner).ExecCfg().JobRegistry) + }, txn, p.(*planner).ExecCfg().JobRegistry) if err != nil { return err diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index 4acc503a5d60..ccd8c41f0563 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -72,8 +72,6 @@ go_test( "//pkg/jobs/jobspb", "//pkg/jobs/jobstest", "//pkg/kv", - "//pkg/kv/kvserver", - "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb:with-mocks", "//pkg/scheduledjobs", "//pkg/security", @@ -81,7 +79,6 @@ go_test( "//pkg/server", "//pkg/sql", "//pkg/sql/catalog", - "//pkg/sql/catalog/descpb", "//pkg/sql/sessiondata", "//pkg/sql/sqlstats", "//pkg/sql/sqlutil", diff --git a/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go b/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go index d3af280c62fd..5d8f897b37c1 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go +++ b/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go @@ -79,20 +79,12 @@ func CreateSQLStatsCompactionScheduleIfNotYetExist( return compactionSchedule, nil } -// CreateCompactionJob creates a system.jobs record if there is no other -// SQL Stats compaction job running. This is invoked by the scheduled job -// Executor. +// CreateCompactionJob creates a system.jobs record. +// We do not need to worry about checking if the job already exist; +// at most 1 job semantics are enforced by scheduled jobs system. func CreateCompactionJob( - ctx context.Context, - createdByInfo *jobs.CreatedByInfo, - txn *kv.Txn, - ie sqlutil.InternalExecutor, - jobRegistry *jobs.Registry, + ctx context.Context, createdByInfo *jobs.CreatedByInfo, txn *kv.Txn, jobRegistry *jobs.Registry, ) (jobspb.JobID, error) { - if err := CheckExistingCompactionJob(ctx, nil /* job */, ie, txn); err != nil { - return jobspb.InvalidJobID, err - } - record := jobs.Record{ Description: "automatic SQL Stats compaction", Username: security.NodeUserName(), @@ -108,26 +100,6 @@ func CreateCompactionJob( return jobID, nil } -// CheckExistingCompactionJob checks for existing SQL Stats Compaction job -// that are either PAUSED, CANCELED, or RUNNING. If so, it returns a -// ErrConcurrentSQLStatsCompaction. -func CheckExistingCompactionJob( - ctx context.Context, job *jobs.Job, ie sqlutil.InternalExecutor, txn *kv.Txn, -) error { - jobID := jobspb.InvalidJobID - if job != nil { - jobID = job.ID() - } - exists, err := jobs.RunningJobExists(ctx, jobID, ie, txn, func(payload *jobspb.Payload) bool { - return payload.Type() == jobspb.TypeAutoSQLStatsCompaction - }) - - if err == nil && exists { - err = ErrConcurrentSQLStatsCompaction - } - return err -} - func checkExistingCompactionSchedule( ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, ) (exists bool, _ error) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go b/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go index 269655cb1c2c..e89230bbd067 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go @@ -20,19 +20,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -202,76 +197,6 @@ func TestSQLStatsCompactor(t *testing.T) { } } -func TestAtMostOneSQLStatsCompactionJob(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - var serverArgs base.TestServerArgs - var allowRequest chan struct{} - - serverArgs.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{ - AOSTClause: "AS OF SYSTEM TIME '-1us'", - } - - params := base.TestClusterArgs{ServerArgs: serverArgs} - params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() - - allowRequest = make(chan struct{}) - params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ - TestingRequestFilter: createStatsRequestFilter(t, allowRequest), - } - - ctx := context.Background() - tc := serverutils.StartNewTestCluster(t, 3 /* numNodes */, params) - defer tc.Stopper().Stop(ctx) - conn := tc.ServerConn(0 /* idx */) - sqlDB := sqlutils.MakeSQLRunner(conn) - server := tc.Server(0 /* idx */) - - jobID, err := launchSQLStatsCompactionJob(server) - require.NoError(t, err) - - // We wait until the job appears in the system.jobs table. - sqlDB.CheckQueryResultsRetry( - t, - fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d`, jobID), - [][]string{{"1"}}, - ) - - allowRequest <- struct{}{} - - // Launching a second job should fail here since we are still blocking the - // the first job's execution through allowRequest. (Since we need to send - // struct{}{} twice into the channel to fully unblock it.) - _, err = launchSQLStatsCompactionJob(server) - expected := persistedsqlstats.ErrConcurrentSQLStatsCompaction.Error() - if !testutils.IsError(err, expected) { - t.Fatalf("expected '%s' error, but got %+v", expected, err) - } - - allowRequest <- struct{}{} - close(allowRequest) - - // We wait until the first job finishes. - sqlDB.CheckQueryResultsRetry( - t, - fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID), - [][]string{{"1"}}, - ) - - // Launching the job now should succeed. - jobID, err = launchSQLStatsCompactionJob(server) - require.NoError(t, err) - - // Wait until the second job to finish for sanity check. - server.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() - sqlDB.CheckQueryResultsRetry( - t, - fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID), - [][]string{{"1"}}, - ) -} - func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -310,7 +235,6 @@ func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) { func launchSQLStatsCompactionJob(server serverutils.TestServerInterface) (jobspb.JobID, error) { return persistedsqlstats.CreateCompactionJob( context.Background(), nil /* createdByInfo */, nil, /* txn */ - server.InternalExecutor().(sqlutil.InternalExecutor), server.JobRegistry().(*jobs.Registry), ) } @@ -367,38 +291,3 @@ ORDER BY aggregated_ts` return stmtFingerprints, txnFingerprints } - -func createStatsRequestFilter( - t *testing.T, allowToProgress chan struct{}, -) kvserverbase.ReplicaRequestFilter { - // Start a test server here, so we can get the descriptor ID for the system - // table. This allows us to not hardcode the descriptor ID. - s, sqlConn, _ := serverutils.StartServer(t, base.TestServerArgs{}) - defer func() { - s.Stopper().Stop(context.Background()) - err := sqlConn.Close() - require.NoError(t, err) - }() - sqlDB := sqlutils.MakeSQLRunner(sqlConn) - - stmtStatsTableID, txnStatsTableID := getStatsTablesIDs(t, sqlDB) - return func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { - if req, ok := ba.GetArg(roachpb.Scan); ok { - _, tableID, _ := encoding.DecodeUvarintAscending(req.(*roachpb.ScanRequest).Key) - if descpb.ID(tableID) == stmtStatsTableID || descpb.ID(tableID) == txnStatsTableID { - <-allowToProgress - <-allowToProgress - } - } - return nil - } -} - -func getStatsTablesIDs( - t *testing.T, sqlDB *sqlutils.SQLRunner, -) (stmtStatsTableID, txnStatsTableID descpb.ID) { - stmt := - "select 'system.statement_statistics'::regclass::oid, 'system.transaction_statistics'::regclass::oid" - sqlDB.QueryRow(t, stmt).Scan(&stmtStatsTableID, &txnStatsTableID) - return stmtStatsTableID, txnStatsTableID -} diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 911d00b6c8ed..2bc86ebc73b4 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -31,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/errors" ) const ( @@ -42,10 +41,6 @@ const ( dummyPlanHash = int64(0) ) -// ErrConcurrentSQLStatsCompaction is reported when two sql stats compaction -// jobs are issued concurrently. This is a sentinel error. -var ErrConcurrentSQLStatsCompaction = errors.New("another sql stats compaction job is already running") - // Config is a configuration struct for the persisted SQL stats subsystem. type Config struct { Settings *cluster.Settings From fd07e3e9fade9e9995ec5542e80f7eabf3b07239 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 24 Mar 2022 21:46:51 -0400 Subject: [PATCH 2/2] jobs: Remove unnecessary and expensive scheduled jobs metrics. Remove `schedules.round.schedules-ready-to-run` and `schedules.round.num-jobs-running` metrics from job scheduler. These metrics are very expensive to compute as they involve running wider table scans against both `system.jobs` and `system.scheduled_job`. In addition to being expensive to compute, these metrics are not needed since the query can be executed directly if needed, and, in addition these metrics are confusing since these metrics are per node, while the number of running jobs/schedules is cluster wide. More importantly, they can lead to job scheduler query being more expensive since they increase the read set of the scheduler transaction, thus causing txn restarts to be more expensive. Fixes #78447 Release Notes (enterprise): Remove expensive, unnecessary, and never used `schedules.round.schedules-ready-to-run` and `schedules.round.num-jobs-running` metrics from job schedulers. Release Justification: Stability fix for scheduled job system. --- pkg/jobs/job_scheduler.go | 44 +++------------------------------- pkg/jobs/job_scheduler_test.go | 7 +++++- pkg/jobs/schedule_metrics.go | 18 -------------- 3 files changed, 9 insertions(+), 60 deletions(-) diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 2e1bfcd214fd..313bde7d939f 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -118,14 +117,11 @@ const recheckRunningAfter = 1 * time.Minute type loopStats struct { rescheduleWait, rescheduleSkip, started int64 - readyToRun, jobsRunning int64 malformed int64 } func (s *loopStats) updateMetrics(m *SchedulerMetrics) { m.NumStarted.Update(s.started) - m.ReadyToRun.Update(s.readyToRun) - m.NumRunning.Update(s.jobsRunning) m.RescheduleSkip.Update(s.rescheduleSkip) m.RescheduleWait.Update(s.rescheduleWait) m.NumMalformedSchedules.Update(s.malformed) @@ -194,36 +190,6 @@ func (s *jobScheduler) processSchedule( return schedule.Update(ctx, s.InternalExecutor, txn) } -// TODO(yevgeniy): Re-evaluate if we need to have per-loop execution statistics. -func newLoopStats( - ctx context.Context, env scheduledjobs.JobSchedulerEnv, ex sqlutil.InternalExecutor, txn *kv.Txn, -) (*loopStats, error) { - numRunningJobsStmt := fmt.Sprintf( - "SELECT count(*) FROM %s WHERE created_by_type = '%s' AND status NOT IN ('%s', '%s', '%s', '%s')", - env.SystemJobsTableName(), CreatedByScheduledJobs, - StatusSucceeded, StatusCanceled, StatusFailed, StatusRevertFailed) - readyToRunStmt := fmt.Sprintf( - "SELECT count(*) FROM %s WHERE next_run < %s", - env.ScheduledJobsTableName(), env.NowExpr()) - statsStmt := fmt.Sprintf( - "SELECT (%s) numReadySchedules, (%s) numRunningJobs", - readyToRunStmt, numRunningJobsStmt) - - datums, err := ex.QueryRowEx(ctx, "scheduler-stats", txn, - sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, - statsStmt) - if err != nil { - return nil, err - } - if datums == nil { - return nil, errors.New("failed to read scheduler stats") - } - stats := &loopStats{} - stats.readyToRun = int64(tree.MustBeDInt(datums[0])) - stats.jobsRunning = int64(tree.MustBeDInt(datums[1])) - return stats, nil -} - type savePointError struct { err error } @@ -268,11 +234,7 @@ func withSavePoint(ctx context.Context, txn *kv.Txn, fn func() error) error { func (s *jobScheduler) executeSchedules( ctx context.Context, maxSchedules int64, txn *kv.Txn, ) (retErr error) { - stats, err := newLoopStats(ctx, s.env, s.InternalExecutor, txn) - if err != nil { - return err - } - + var stats loopStats defer stats.updateMetrics(&s.metrics) findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules) @@ -308,10 +270,10 @@ func (s *jobScheduler) executeSchedules( return contextutil.RunWithTimeout( ctx, fmt.Sprintf("process-schedule-%d", schedule.ScheduleID()), timeout, func(ctx context.Context) error { - return s.processSchedule(ctx, schedule, numRunning, stats, txn) + return s.processSchedule(ctx, schedule, numRunning, &stats, txn) }) } - return s.processSchedule(ctx, schedule, numRunning, stats, txn) + return s.processSchedule(ctx, schedule, numRunning, &stats, txn) }); processErr != nil { if errors.HasType(processErr, (*savePointError)(nil)) { return errors.Wrapf(processErr, "savepoint error for schedule %d", schedule.ScheduleID()) diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 419bbba4623e..964e9f6d5971 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -454,8 +454,13 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) { daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry()) daemon.runDaemon(ctx, stopper) + readyToRunStmt := fmt.Sprintf( + "SELECT count(*) FROM %s WHERE next_run < %s", + h.env.ScheduledJobsTableName(), h.env.NowExpr()) testutils.SucceedsSoon(t, func() error { - if ready := daemon.metrics.ReadyToRun.Value(); numJobs != ready { + var ready int + h.sqlDB.QueryRow(t, readyToRunStmt).Scan(&ready) + if ready != numJobs-jobsPerIteration { return errors.Errorf("waiting for metric %d = %d", ready, numJobs) } return nil diff --git a/pkg/jobs/schedule_metrics.go b/pkg/jobs/schedule_metrics.go index f613e2b5296f..a60e88f64e25 100644 --- a/pkg/jobs/schedule_metrics.go +++ b/pkg/jobs/schedule_metrics.go @@ -31,12 +31,8 @@ func (m *ExecutorMetrics) MetricStruct() {} // SchedulerMetrics are metrics specific to job scheduler daemon. type SchedulerMetrics struct { - // Number of schedules that were ready to execute. - ReadyToRun *metric.Gauge // Number of scheduled jobs started. NumStarted *metric.Gauge - // Number of jobs started by schedules that are currently running. - NumRunning *metric.Gauge // Number of schedules rescheduled due to SKIP policy. RescheduleSkip *metric.Gauge // Number of schedules rescheduled due to WAIT policy. @@ -51,20 +47,6 @@ type SchedulerMetrics struct { // MakeSchedulerMetrics returns metrics for scheduled job daemon. func MakeSchedulerMetrics() SchedulerMetrics { return SchedulerMetrics{ - ReadyToRun: metric.NewGauge(metric.Metadata{ - Name: "schedules.round.schedules-ready-to-run", - Help: "The number of jobs ready to execute", - Measurement: "Schedules", - Unit: metric.Unit_COUNT, - }), - - NumRunning: metric.NewGauge(metric.Metadata{ - Name: "schedules.round.num-jobs-running", - Help: "The number of jobs started by schedules that are currently running", - Measurement: "Jobs", - Unit: metric.Unit_COUNT, - }), - NumStarted: metric.NewGauge(metric.Metadata{ Name: "schedules.round.jobs-started", Help: "The number of jobs started",