Skip to content

Commit

Permalink
Merge pull request #78583 from miretskiy/backport21.2-78467
Browse files Browse the repository at this point in the history
release-21.2: jobs, sql: Avoid jobs/scheduled jobs lock up
  • Loading branch information
miretskiy authored Mar 28, 2022
2 parents 143b253 + fd07e3e commit 48866a2
Show file tree
Hide file tree
Showing 9 changed files with 16 additions and 221 deletions.
44 changes: 3 additions & 41 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -118,14 +117,11 @@ const recheckRunningAfter = 1 * time.Minute

type loopStats struct {
rescheduleWait, rescheduleSkip, started int64
readyToRun, jobsRunning int64
malformed int64
}

func (s *loopStats) updateMetrics(m *SchedulerMetrics) {
m.NumStarted.Update(s.started)
m.ReadyToRun.Update(s.readyToRun)
m.NumRunning.Update(s.jobsRunning)
m.RescheduleSkip.Update(s.rescheduleSkip)
m.RescheduleWait.Update(s.rescheduleWait)
m.NumMalformedSchedules.Update(s.malformed)
Expand Down Expand Up @@ -194,36 +190,6 @@ func (s *jobScheduler) processSchedule(
return schedule.Update(ctx, s.InternalExecutor, txn)
}

// TODO(yevgeniy): Re-evaluate if we need to have per-loop execution statistics.
func newLoopStats(
ctx context.Context, env scheduledjobs.JobSchedulerEnv, ex sqlutil.InternalExecutor, txn *kv.Txn,
) (*loopStats, error) {
numRunningJobsStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE created_by_type = '%s' AND status NOT IN ('%s', '%s', '%s', '%s')",
env.SystemJobsTableName(), CreatedByScheduledJobs,
StatusSucceeded, StatusCanceled, StatusFailed, StatusRevertFailed)
readyToRunStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE next_run < %s",
env.ScheduledJobsTableName(), env.NowExpr())
statsStmt := fmt.Sprintf(
"SELECT (%s) numReadySchedules, (%s) numRunningJobs",
readyToRunStmt, numRunningJobsStmt)

datums, err := ex.QueryRowEx(ctx, "scheduler-stats", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
statsStmt)
if err != nil {
return nil, err
}
if datums == nil {
return nil, errors.New("failed to read scheduler stats")
}
stats := &loopStats{}
stats.readyToRun = int64(tree.MustBeDInt(datums[0]))
stats.jobsRunning = int64(tree.MustBeDInt(datums[1]))
return stats, nil
}

type savePointError struct {
err error
}
Expand Down Expand Up @@ -268,11 +234,7 @@ func withSavePoint(ctx context.Context, txn *kv.Txn, fn func() error) error {
func (s *jobScheduler) executeSchedules(
ctx context.Context, maxSchedules int64, txn *kv.Txn,
) (retErr error) {
stats, err := newLoopStats(ctx, s.env, s.InternalExecutor, txn)
if err != nil {
return err
}

var stats loopStats
defer stats.updateMetrics(&s.metrics)

findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules)
Expand Down Expand Up @@ -308,10 +270,10 @@ func (s *jobScheduler) executeSchedules(
return contextutil.RunWithTimeout(
ctx, fmt.Sprintf("process-schedule-%d", schedule.ScheduleID()), timeout,
func(ctx context.Context) error {
return s.processSchedule(ctx, schedule, numRunning, stats, txn)
return s.processSchedule(ctx, schedule, numRunning, &stats, txn)
})
}
return s.processSchedule(ctx, schedule, numRunning, stats, txn)
return s.processSchedule(ctx, schedule, numRunning, &stats, txn)
}); processErr != nil {
if errors.HasType(processErr, (*savePointError)(nil)) {
return errors.Wrapf(processErr, "savepoint error for schedule %d", schedule.ScheduleID())
Expand Down
7 changes: 6 additions & 1 deletion pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,13 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) {
daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry())
daemon.runDaemon(ctx, stopper)

readyToRunStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE next_run < %s",
h.env.ScheduledJobsTableName(), h.env.NowExpr())
testutils.SucceedsSoon(t, func() error {
if ready := daemon.metrics.ReadyToRun.Value(); numJobs != ready {
var ready int
h.sqlDB.QueryRow(t, readyToRunStmt).Scan(&ready)
if ready != numJobs-jobsPerIteration {
return errors.Errorf("waiting for metric %d = %d", ready, numJobs)
}
return nil
Expand Down
18 changes: 0 additions & 18 deletions pkg/jobs/schedule_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ func (m *ExecutorMetrics) MetricStruct() {}

// SchedulerMetrics are metrics specific to job scheduler daemon.
type SchedulerMetrics struct {
// Number of schedules that were ready to execute.
ReadyToRun *metric.Gauge
// Number of scheduled jobs started.
NumStarted *metric.Gauge
// Number of jobs started by schedules that are currently running.
NumRunning *metric.Gauge
// Number of schedules rescheduled due to SKIP policy.
RescheduleSkip *metric.Gauge
// Number of schedules rescheduled due to WAIT policy.
Expand All @@ -51,20 +47,6 @@ type SchedulerMetrics struct {
// MakeSchedulerMetrics returns metrics for scheduled job daemon.
func MakeSchedulerMetrics() SchedulerMetrics {
return SchedulerMetrics{
ReadyToRun: metric.NewGauge(metric.Metadata{
Name: "schedules.round.schedules-ready-to-run",
Help: "The number of jobs ready to execute",
Measurement: "Schedules",
Unit: metric.Unit_COUNT,
}),

NumRunning: metric.NewGauge(metric.Metadata{
Name: "schedules.round.num-jobs-running",
Help: "The number of jobs started by schedules that are currently running",
Measurement: "Jobs",
Unit: metric.Unit_COUNT,
}),

NumStarted: metric.NewGauge(metric.Metadata{
Name: "schedules.round.jobs-started",
Help: "The number of jobs started",
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"doc.go",
"drain.go",
"grpc_server.go",
"import_ts.go",
"index_usage_stats.go",
"init.go",
"init_handshake.go",
Expand Down Expand Up @@ -404,6 +405,7 @@ go_test(
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_golang_x_crypto//bcrypt",
],
Expand Down
11 changes: 1 addition & 10 deletions pkg/sql/compact_sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interfac
return err
}

// We check for concurrently running SQL Stats compaction jobs. We only allow
// one job to be running at the same time.
if err := persistedsqlstats.CheckExistingCompactionJob(ctx, r.job, ie, nil /* txn */); err != nil {
if errors.Is(err, persistedsqlstats.ErrConcurrentSQLStatsCompaction) {
log.Infof(ctx, "exiting due to a running sql stats compaction job")
}
return err
}

statsCompactor := persistedsqlstats.NewStatsCompactor(
r.st,
ie,
Expand Down Expand Up @@ -202,7 +193,7 @@ func (e *scheduledSQLStatsCompactionExecutor) createSQLStatsCompactionJob(
persistedsqlstats.CreateCompactionJob(ctx, &jobs.CreatedByInfo{
ID: sj.ScheduleID(),
Name: jobs.CreatedByScheduledJobs,
}, txn, cfg.InternalExecutor, p.(*planner).ExecCfg().JobRegistry)
}, txn, p.(*planner).ExecCfg().JobRegistry)

if err != nil {
return err
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,13 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb:with-mocks",
"//pkg/scheduledjobs",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats",
"//pkg/sql/sqlutil",
Expand Down
36 changes: 4 additions & 32 deletions pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,12 @@ func CreateSQLStatsCompactionScheduleIfNotYetExist(
return compactionSchedule, nil
}

// CreateCompactionJob creates a system.jobs record if there is no other
// SQL Stats compaction job running. This is invoked by the scheduled job
// Executor.
// CreateCompactionJob creates a system.jobs record.
// We do not need to worry about checking if the job already exist;
// at most 1 job semantics are enforced by scheduled jobs system.
func CreateCompactionJob(
ctx context.Context,
createdByInfo *jobs.CreatedByInfo,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
jobRegistry *jobs.Registry,
ctx context.Context, createdByInfo *jobs.CreatedByInfo, txn *kv.Txn, jobRegistry *jobs.Registry,
) (jobspb.JobID, error) {
if err := CheckExistingCompactionJob(ctx, nil /* job */, ie, txn); err != nil {
return jobspb.InvalidJobID, err
}

record := jobs.Record{
Description: "automatic SQL Stats compaction",
Username: security.NodeUserName(),
Expand All @@ -108,26 +100,6 @@ func CreateCompactionJob(
return jobID, nil
}

// CheckExistingCompactionJob checks for existing SQL Stats Compaction job
// that are either PAUSED, CANCELED, or RUNNING. If so, it returns a
// ErrConcurrentSQLStatsCompaction.
func CheckExistingCompactionJob(
ctx context.Context, job *jobs.Job, ie sqlutil.InternalExecutor, txn *kv.Txn,
) error {
jobID := jobspb.InvalidJobID
if job != nil {
jobID = job.ID()
}
exists, err := jobs.RunningJobExists(ctx, jobID, ie, txn, func(payload *jobspb.Payload) bool {
return payload.Type() == jobspb.TypeAutoSQLStatsCompaction
})

if err == nil && exists {
err = ErrConcurrentSQLStatsCompaction
}
return err
}

func checkExistingCompactionSchedule(
ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn,
) (exists bool, _ error) {
Expand Down
111 changes: 0 additions & 111 deletions pkg/sql/sqlstats/persistedsqlstats/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -202,76 +197,6 @@ func TestSQLStatsCompactor(t *testing.T) {
}
}

func TestAtMostOneSQLStatsCompactionJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var serverArgs base.TestServerArgs
var allowRequest chan struct{}

serverArgs.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{
AOSTClause: "AS OF SYSTEM TIME '-1us'",
}

params := base.TestClusterArgs{ServerArgs: serverArgs}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()

allowRequest = make(chan struct{})
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: createStatsRequestFilter(t, allowRequest),
}

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 3 /* numNodes */, params)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0 /* idx */)
sqlDB := sqlutils.MakeSQLRunner(conn)
server := tc.Server(0 /* idx */)

jobID, err := launchSQLStatsCompactionJob(server)
require.NoError(t, err)

// We wait until the job appears in the system.jobs table.
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d`, jobID),
[][]string{{"1"}},
)

allowRequest <- struct{}{}

// Launching a second job should fail here since we are still blocking the
// the first job's execution through allowRequest. (Since we need to send
// struct{}{} twice into the channel to fully unblock it.)
_, err = launchSQLStatsCompactionJob(server)
expected := persistedsqlstats.ErrConcurrentSQLStatsCompaction.Error()
if !testutils.IsError(err, expected) {
t.Fatalf("expected '%s' error, but got %+v", expected, err)
}

allowRequest <- struct{}{}
close(allowRequest)

// We wait until the first job finishes.
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID),
[][]string{{"1"}},
)

// Launching the job now should succeed.
jobID, err = launchSQLStatsCompactionJob(server)
require.NoError(t, err)

// Wait until the second job to finish for sanity check.
server.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID),
[][]string{{"1"}},
)
}

func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -310,7 +235,6 @@ func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) {
func launchSQLStatsCompactionJob(server serverutils.TestServerInterface) (jobspb.JobID, error) {
return persistedsqlstats.CreateCompactionJob(
context.Background(), nil /* createdByInfo */, nil, /* txn */
server.InternalExecutor().(sqlutil.InternalExecutor),
server.JobRegistry().(*jobs.Registry),
)
}
Expand Down Expand Up @@ -367,38 +291,3 @@ ORDER BY aggregated_ts`

return stmtFingerprints, txnFingerprints
}

func createStatsRequestFilter(
t *testing.T, allowToProgress chan struct{},
) kvserverbase.ReplicaRequestFilter {
// Start a test server here, so we can get the descriptor ID for the system
// table. This allows us to not hardcode the descriptor ID.
s, sqlConn, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer func() {
s.Stopper().Stop(context.Background())
err := sqlConn.Close()
require.NoError(t, err)
}()
sqlDB := sqlutils.MakeSQLRunner(sqlConn)

stmtStatsTableID, txnStatsTableID := getStatsTablesIDs(t, sqlDB)
return func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if req, ok := ba.GetArg(roachpb.Scan); ok {
_, tableID, _ := encoding.DecodeUvarintAscending(req.(*roachpb.ScanRequest).Key)
if descpb.ID(tableID) == stmtStatsTableID || descpb.ID(tableID) == txnStatsTableID {
<-allowToProgress
<-allowToProgress
}
}
return nil
}
}

func getStatsTablesIDs(
t *testing.T, sqlDB *sqlutils.SQLRunner,
) (stmtStatsTableID, txnStatsTableID descpb.ID) {
stmt :=
"select 'system.statement_statistics'::regclass::oid, 'system.transaction_statistics'::regclass::oid"
sqlDB.QueryRow(t, stmt).Scan(&stmtStatsTableID, &txnStatsTableID)
return stmtStatsTableID, txnStatsTableID
}
Loading

0 comments on commit 48866a2

Please sign in to comment.