diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 2e1bfcd214fd..313bde7d939f 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" - "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -118,14 +117,11 @@ const recheckRunningAfter = 1 * time.Minute type loopStats struct { rescheduleWait, rescheduleSkip, started int64 - readyToRun, jobsRunning int64 malformed int64 } func (s *loopStats) updateMetrics(m *SchedulerMetrics) { m.NumStarted.Update(s.started) - m.ReadyToRun.Update(s.readyToRun) - m.NumRunning.Update(s.jobsRunning) m.RescheduleSkip.Update(s.rescheduleSkip) m.RescheduleWait.Update(s.rescheduleWait) m.NumMalformedSchedules.Update(s.malformed) @@ -194,36 +190,6 @@ func (s *jobScheduler) processSchedule( return schedule.Update(ctx, s.InternalExecutor, txn) } -// TODO(yevgeniy): Re-evaluate if we need to have per-loop execution statistics. -func newLoopStats( - ctx context.Context, env scheduledjobs.JobSchedulerEnv, ex sqlutil.InternalExecutor, txn *kv.Txn, -) (*loopStats, error) { - numRunningJobsStmt := fmt.Sprintf( - "SELECT count(*) FROM %s WHERE created_by_type = '%s' AND status NOT IN ('%s', '%s', '%s', '%s')", - env.SystemJobsTableName(), CreatedByScheduledJobs, - StatusSucceeded, StatusCanceled, StatusFailed, StatusRevertFailed) - readyToRunStmt := fmt.Sprintf( - "SELECT count(*) FROM %s WHERE next_run < %s", - env.ScheduledJobsTableName(), env.NowExpr()) - statsStmt := fmt.Sprintf( - "SELECT (%s) numReadySchedules, (%s) numRunningJobs", - readyToRunStmt, numRunningJobsStmt) - - datums, err := ex.QueryRowEx(ctx, "scheduler-stats", txn, - sessiondata.InternalExecutorOverride{User: security.NodeUserName()}, - statsStmt) - if err != nil { - return nil, err - } - if datums == nil { - return nil, errors.New("failed to read scheduler stats") - } - stats := &loopStats{} - stats.readyToRun = int64(tree.MustBeDInt(datums[0])) - stats.jobsRunning = int64(tree.MustBeDInt(datums[1])) - return stats, nil -} - type savePointError struct { err error } @@ -268,11 +234,7 @@ func withSavePoint(ctx context.Context, txn *kv.Txn, fn func() error) error { func (s *jobScheduler) executeSchedules( ctx context.Context, maxSchedules int64, txn *kv.Txn, ) (retErr error) { - stats, err := newLoopStats(ctx, s.env, s.InternalExecutor, txn) - if err != nil { - return err - } - + var stats loopStats defer stats.updateMetrics(&s.metrics) findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules) @@ -308,10 +270,10 @@ func (s *jobScheduler) executeSchedules( return contextutil.RunWithTimeout( ctx, fmt.Sprintf("process-schedule-%d", schedule.ScheduleID()), timeout, func(ctx context.Context) error { - return s.processSchedule(ctx, schedule, numRunning, stats, txn) + return s.processSchedule(ctx, schedule, numRunning, &stats, txn) }) } - return s.processSchedule(ctx, schedule, numRunning, stats, txn) + return s.processSchedule(ctx, schedule, numRunning, &stats, txn) }); processErr != nil { if errors.HasType(processErr, (*savePointError)(nil)) { return errors.Wrapf(processErr, "savepoint error for schedule %d", schedule.ScheduleID()) diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 419bbba4623e..964e9f6d5971 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -454,8 +454,13 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) { daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry()) daemon.runDaemon(ctx, stopper) + readyToRunStmt := fmt.Sprintf( + "SELECT count(*) FROM %s WHERE next_run < %s", + h.env.ScheduledJobsTableName(), h.env.NowExpr()) testutils.SucceedsSoon(t, func() error { - if ready := daemon.metrics.ReadyToRun.Value(); numJobs != ready { + var ready int + h.sqlDB.QueryRow(t, readyToRunStmt).Scan(&ready) + if ready != numJobs-jobsPerIteration { return errors.Errorf("waiting for metric %d = %d", ready, numJobs) } return nil diff --git a/pkg/jobs/schedule_metrics.go b/pkg/jobs/schedule_metrics.go index f613e2b5296f..a60e88f64e25 100644 --- a/pkg/jobs/schedule_metrics.go +++ b/pkg/jobs/schedule_metrics.go @@ -31,12 +31,8 @@ func (m *ExecutorMetrics) MetricStruct() {} // SchedulerMetrics are metrics specific to job scheduler daemon. type SchedulerMetrics struct { - // Number of schedules that were ready to execute. - ReadyToRun *metric.Gauge // Number of scheduled jobs started. NumStarted *metric.Gauge - // Number of jobs started by schedules that are currently running. - NumRunning *metric.Gauge // Number of schedules rescheduled due to SKIP policy. RescheduleSkip *metric.Gauge // Number of schedules rescheduled due to WAIT policy. @@ -51,20 +47,6 @@ type SchedulerMetrics struct { // MakeSchedulerMetrics returns metrics for scheduled job daemon. func MakeSchedulerMetrics() SchedulerMetrics { return SchedulerMetrics{ - ReadyToRun: metric.NewGauge(metric.Metadata{ - Name: "schedules.round.schedules-ready-to-run", - Help: "The number of jobs ready to execute", - Measurement: "Schedules", - Unit: metric.Unit_COUNT, - }), - - NumRunning: metric.NewGauge(metric.Metadata{ - Name: "schedules.round.num-jobs-running", - Help: "The number of jobs started by schedules that are currently running", - Measurement: "Jobs", - Unit: metric.Unit_COUNT, - }), - NumStarted: metric.NewGauge(metric.Metadata{ Name: "schedules.round.jobs-started", Help: "The number of jobs started", diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 71e41737ac27..e454b42f5f10 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "doc.go", "drain.go", "grpc_server.go", + "import_ts.go", "index_usage_stats.go", "init.go", "init_handshake.go", @@ -404,6 +405,7 @@ go_test( "@org_golang_google_grpc//:go_default_library", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//credentials", + "@org_golang_google_grpc//metadata", "@org_golang_google_grpc//status", "@org_golang_x_crypto//bcrypt", ], diff --git a/pkg/sql/compact_sql_stats.go b/pkg/sql/compact_sql_stats.go index e7c28c452263..e011ac15ec4f 100644 --- a/pkg/sql/compact_sql_stats.go +++ b/pkg/sql/compact_sql_stats.go @@ -68,15 +68,6 @@ func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interfac return err } - // We check for concurrently running SQL Stats compaction jobs. We only allow - // one job to be running at the same time. - if err := persistedsqlstats.CheckExistingCompactionJob(ctx, r.job, ie, nil /* txn */); err != nil { - if errors.Is(err, persistedsqlstats.ErrConcurrentSQLStatsCompaction) { - log.Infof(ctx, "exiting due to a running sql stats compaction job") - } - return err - } - statsCompactor := persistedsqlstats.NewStatsCompactor( r.st, ie, @@ -202,7 +193,7 @@ func (e *scheduledSQLStatsCompactionExecutor) createSQLStatsCompactionJob( persistedsqlstats.CreateCompactionJob(ctx, &jobs.CreatedByInfo{ ID: sj.ScheduleID(), Name: jobs.CreatedByScheduledJobs, - }, txn, cfg.InternalExecutor, p.(*planner).ExecCfg().JobRegistry) + }, txn, p.(*planner).ExecCfg().JobRegistry) if err != nil { return err diff --git a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel index 4acc503a5d60..ccd8c41f0563 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel +++ b/pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel @@ -72,8 +72,6 @@ go_test( "//pkg/jobs/jobspb", "//pkg/jobs/jobstest", "//pkg/kv", - "//pkg/kv/kvserver", - "//pkg/kv/kvserver/kvserverbase", "//pkg/roachpb:with-mocks", "//pkg/scheduledjobs", "//pkg/security", @@ -81,7 +79,6 @@ go_test( "//pkg/server", "//pkg/sql", "//pkg/sql/catalog", - "//pkg/sql/catalog/descpb", "//pkg/sql/sessiondata", "//pkg/sql/sqlstats", "//pkg/sql/sqlutil", diff --git a/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go b/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go index d3af280c62fd..5d8f897b37c1 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go +++ b/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go @@ -79,20 +79,12 @@ func CreateSQLStatsCompactionScheduleIfNotYetExist( return compactionSchedule, nil } -// CreateCompactionJob creates a system.jobs record if there is no other -// SQL Stats compaction job running. This is invoked by the scheduled job -// Executor. +// CreateCompactionJob creates a system.jobs record. +// We do not need to worry about checking if the job already exist; +// at most 1 job semantics are enforced by scheduled jobs system. func CreateCompactionJob( - ctx context.Context, - createdByInfo *jobs.CreatedByInfo, - txn *kv.Txn, - ie sqlutil.InternalExecutor, - jobRegistry *jobs.Registry, + ctx context.Context, createdByInfo *jobs.CreatedByInfo, txn *kv.Txn, jobRegistry *jobs.Registry, ) (jobspb.JobID, error) { - if err := CheckExistingCompactionJob(ctx, nil /* job */, ie, txn); err != nil { - return jobspb.InvalidJobID, err - } - record := jobs.Record{ Description: "automatic SQL Stats compaction", Username: security.NodeUserName(), @@ -108,26 +100,6 @@ func CreateCompactionJob( return jobID, nil } -// CheckExistingCompactionJob checks for existing SQL Stats Compaction job -// that are either PAUSED, CANCELED, or RUNNING. If so, it returns a -// ErrConcurrentSQLStatsCompaction. -func CheckExistingCompactionJob( - ctx context.Context, job *jobs.Job, ie sqlutil.InternalExecutor, txn *kv.Txn, -) error { - jobID := jobspb.InvalidJobID - if job != nil { - jobID = job.ID() - } - exists, err := jobs.RunningJobExists(ctx, jobID, ie, txn, func(payload *jobspb.Payload) bool { - return payload.Type() == jobspb.TypeAutoSQLStatsCompaction - }) - - if err == nil && exists { - err = ErrConcurrentSQLStatsCompaction - } - return err -} - func checkExistingCompactionSchedule( ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn, ) (exists bool, _ error) { diff --git a/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go b/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go index 269655cb1c2c..e89230bbd067 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go +++ b/pkg/sql/sqlstats/persistedsqlstats/compaction_test.go @@ -20,19 +20,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" - "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/tests" - "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -202,76 +197,6 @@ func TestSQLStatsCompactor(t *testing.T) { } } -func TestAtMostOneSQLStatsCompactionJob(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - var serverArgs base.TestServerArgs - var allowRequest chan struct{} - - serverArgs.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{ - AOSTClause: "AS OF SYSTEM TIME '-1us'", - } - - params := base.TestClusterArgs{ServerArgs: serverArgs} - params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() - - allowRequest = make(chan struct{}) - params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ - TestingRequestFilter: createStatsRequestFilter(t, allowRequest), - } - - ctx := context.Background() - tc := serverutils.StartNewTestCluster(t, 3 /* numNodes */, params) - defer tc.Stopper().Stop(ctx) - conn := tc.ServerConn(0 /* idx */) - sqlDB := sqlutils.MakeSQLRunner(conn) - server := tc.Server(0 /* idx */) - - jobID, err := launchSQLStatsCompactionJob(server) - require.NoError(t, err) - - // We wait until the job appears in the system.jobs table. - sqlDB.CheckQueryResultsRetry( - t, - fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d`, jobID), - [][]string{{"1"}}, - ) - - allowRequest <- struct{}{} - - // Launching a second job should fail here since we are still blocking the - // the first job's execution through allowRequest. (Since we need to send - // struct{}{} twice into the channel to fully unblock it.) - _, err = launchSQLStatsCompactionJob(server) - expected := persistedsqlstats.ErrConcurrentSQLStatsCompaction.Error() - if !testutils.IsError(err, expected) { - t.Fatalf("expected '%s' error, but got %+v", expected, err) - } - - allowRequest <- struct{}{} - close(allowRequest) - - // We wait until the first job finishes. - sqlDB.CheckQueryResultsRetry( - t, - fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID), - [][]string{{"1"}}, - ) - - // Launching the job now should succeed. - jobID, err = launchSQLStatsCompactionJob(server) - require.NoError(t, err) - - // Wait until the second job to finish for sanity check. - server.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue() - sqlDB.CheckQueryResultsRetry( - t, - fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID), - [][]string{{"1"}}, - ) -} - func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -310,7 +235,6 @@ func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) { func launchSQLStatsCompactionJob(server serverutils.TestServerInterface) (jobspb.JobID, error) { return persistedsqlstats.CreateCompactionJob( context.Background(), nil /* createdByInfo */, nil, /* txn */ - server.InternalExecutor().(sqlutil.InternalExecutor), server.JobRegistry().(*jobs.Registry), ) } @@ -367,38 +291,3 @@ ORDER BY aggregated_ts` return stmtFingerprints, txnFingerprints } - -func createStatsRequestFilter( - t *testing.T, allowToProgress chan struct{}, -) kvserverbase.ReplicaRequestFilter { - // Start a test server here, so we can get the descriptor ID for the system - // table. This allows us to not hardcode the descriptor ID. - s, sqlConn, _ := serverutils.StartServer(t, base.TestServerArgs{}) - defer func() { - s.Stopper().Stop(context.Background()) - err := sqlConn.Close() - require.NoError(t, err) - }() - sqlDB := sqlutils.MakeSQLRunner(sqlConn) - - stmtStatsTableID, txnStatsTableID := getStatsTablesIDs(t, sqlDB) - return func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error { - if req, ok := ba.GetArg(roachpb.Scan); ok { - _, tableID, _ := encoding.DecodeUvarintAscending(req.(*roachpb.ScanRequest).Key) - if descpb.ID(tableID) == stmtStatsTableID || descpb.ID(tableID) == txnStatsTableID { - <-allowToProgress - <-allowToProgress - } - } - return nil - } -} - -func getStatsTablesIDs( - t *testing.T, sqlDB *sqlutils.SQLRunner, -) (stmtStatsTableID, txnStatsTableID descpb.ID) { - stmt := - "select 'system.statement_statistics'::regclass::oid, 'system.transaction_statistics'::regclass::oid" - sqlDB.QueryRow(t, stmt).Scan(&stmtStatsTableID, &txnStatsTableID) - return stmtStatsTableID, txnStatsTableID -} diff --git a/pkg/sql/sqlstats/persistedsqlstats/provider.go b/pkg/sql/sqlstats/persistedsqlstats/provider.go index 911d00b6c8ed..2bc86ebc73b4 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/provider.go +++ b/pkg/sql/sqlstats/persistedsqlstats/provider.go @@ -31,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/errors" ) const ( @@ -42,10 +41,6 @@ const ( dummyPlanHash = int64(0) ) -// ErrConcurrentSQLStatsCompaction is reported when two sql stats compaction -// jobs are issued concurrently. This is a sentinel error. -var ErrConcurrentSQLStatsCompaction = errors.New("another sql stats compaction job is already running") - // Config is a configuration struct for the persisted SQL stats subsystem. type Config struct { Settings *cluster.Settings