Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.2: jobs, sql: Avoid jobs/scheduled jobs lock up #78583

Merged
merged 2 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 3 additions & 41 deletions pkg/jobs/job_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -118,14 +117,11 @@ const recheckRunningAfter = 1 * time.Minute

type loopStats struct {
rescheduleWait, rescheduleSkip, started int64
readyToRun, jobsRunning int64
malformed int64
}

func (s *loopStats) updateMetrics(m *SchedulerMetrics) {
m.NumStarted.Update(s.started)
m.ReadyToRun.Update(s.readyToRun)
m.NumRunning.Update(s.jobsRunning)
m.RescheduleSkip.Update(s.rescheduleSkip)
m.RescheduleWait.Update(s.rescheduleWait)
m.NumMalformedSchedules.Update(s.malformed)
Expand Down Expand Up @@ -194,36 +190,6 @@ func (s *jobScheduler) processSchedule(
return schedule.Update(ctx, s.InternalExecutor, txn)
}

// TODO(yevgeniy): Re-evaluate if we need to have per-loop execution statistics.
func newLoopStats(
ctx context.Context, env scheduledjobs.JobSchedulerEnv, ex sqlutil.InternalExecutor, txn *kv.Txn,
) (*loopStats, error) {
numRunningJobsStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE created_by_type = '%s' AND status NOT IN ('%s', '%s', '%s', '%s')",
env.SystemJobsTableName(), CreatedByScheduledJobs,
StatusSucceeded, StatusCanceled, StatusFailed, StatusRevertFailed)
readyToRunStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE next_run < %s",
env.ScheduledJobsTableName(), env.NowExpr())
statsStmt := fmt.Sprintf(
"SELECT (%s) numReadySchedules, (%s) numRunningJobs",
readyToRunStmt, numRunningJobsStmt)

datums, err := ex.QueryRowEx(ctx, "scheduler-stats", txn,
sessiondata.InternalExecutorOverride{User: security.NodeUserName()},
statsStmt)
if err != nil {
return nil, err
}
if datums == nil {
return nil, errors.New("failed to read scheduler stats")
}
stats := &loopStats{}
stats.readyToRun = int64(tree.MustBeDInt(datums[0]))
stats.jobsRunning = int64(tree.MustBeDInt(datums[1]))
return stats, nil
}

type savePointError struct {
err error
}
Expand Down Expand Up @@ -268,11 +234,7 @@ func withSavePoint(ctx context.Context, txn *kv.Txn, fn func() error) error {
func (s *jobScheduler) executeSchedules(
ctx context.Context, maxSchedules int64, txn *kv.Txn,
) (retErr error) {
stats, err := newLoopStats(ctx, s.env, s.InternalExecutor, txn)
if err != nil {
return err
}

var stats loopStats
defer stats.updateMetrics(&s.metrics)

findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules)
Expand Down Expand Up @@ -308,10 +270,10 @@ func (s *jobScheduler) executeSchedules(
return contextutil.RunWithTimeout(
ctx, fmt.Sprintf("process-schedule-%d", schedule.ScheduleID()), timeout,
func(ctx context.Context) error {
return s.processSchedule(ctx, schedule, numRunning, stats, txn)
return s.processSchedule(ctx, schedule, numRunning, &stats, txn)
})
}
return s.processSchedule(ctx, schedule, numRunning, stats, txn)
return s.processSchedule(ctx, schedule, numRunning, &stats, txn)
}); processErr != nil {
if errors.HasType(processErr, (*savePointError)(nil)) {
return errors.Wrapf(processErr, "savepoint error for schedule %d", schedule.ScheduleID())
Expand Down
7 changes: 6 additions & 1 deletion pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,13 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) {
daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry())
daemon.runDaemon(ctx, stopper)

readyToRunStmt := fmt.Sprintf(
"SELECT count(*) FROM %s WHERE next_run < %s",
h.env.ScheduledJobsTableName(), h.env.NowExpr())
testutils.SucceedsSoon(t, func() error {
if ready := daemon.metrics.ReadyToRun.Value(); numJobs != ready {
var ready int
h.sqlDB.QueryRow(t, readyToRunStmt).Scan(&ready)
if ready != numJobs-jobsPerIteration {
return errors.Errorf("waiting for metric %d = %d", ready, numJobs)
}
return nil
Expand Down
18 changes: 0 additions & 18 deletions pkg/jobs/schedule_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,8 @@ func (m *ExecutorMetrics) MetricStruct() {}

// SchedulerMetrics are metrics specific to job scheduler daemon.
type SchedulerMetrics struct {
// Number of schedules that were ready to execute.
ReadyToRun *metric.Gauge
// Number of scheduled jobs started.
NumStarted *metric.Gauge
// Number of jobs started by schedules that are currently running.
NumRunning *metric.Gauge
// Number of schedules rescheduled due to SKIP policy.
RescheduleSkip *metric.Gauge
// Number of schedules rescheduled due to WAIT policy.
Expand All @@ -51,20 +47,6 @@ type SchedulerMetrics struct {
// MakeSchedulerMetrics returns metrics for scheduled job daemon.
func MakeSchedulerMetrics() SchedulerMetrics {
return SchedulerMetrics{
ReadyToRun: metric.NewGauge(metric.Metadata{
Name: "schedules.round.schedules-ready-to-run",
Help: "The number of jobs ready to execute",
Measurement: "Schedules",
Unit: metric.Unit_COUNT,
}),

NumRunning: metric.NewGauge(metric.Metadata{
Name: "schedules.round.num-jobs-running",
Help: "The number of jobs started by schedules that are currently running",
Measurement: "Jobs",
Unit: metric.Unit_COUNT,
}),

NumStarted: metric.NewGauge(metric.Metadata{
Name: "schedules.round.jobs-started",
Help: "The number of jobs started",
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"doc.go",
"drain.go",
"grpc_server.go",
"import_ts.go",
"index_usage_stats.go",
"init.go",
"init_handshake.go",
Expand Down Expand Up @@ -404,6 +405,7 @@ go_test(
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//credentials",
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//status",
"@org_golang_x_crypto//bcrypt",
],
Expand Down
11 changes: 1 addition & 10 deletions pkg/sql/compact_sql_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,6 @@ func (r *sqlStatsCompactionResumer) Resume(ctx context.Context, execCtx interfac
return err
}

// We check for concurrently running SQL Stats compaction jobs. We only allow
// one job to be running at the same time.
if err := persistedsqlstats.CheckExistingCompactionJob(ctx, r.job, ie, nil /* txn */); err != nil {
if errors.Is(err, persistedsqlstats.ErrConcurrentSQLStatsCompaction) {
log.Infof(ctx, "exiting due to a running sql stats compaction job")
}
return err
}

statsCompactor := persistedsqlstats.NewStatsCompactor(
r.st,
ie,
Expand Down Expand Up @@ -202,7 +193,7 @@ func (e *scheduledSQLStatsCompactionExecutor) createSQLStatsCompactionJob(
persistedsqlstats.CreateCompactionJob(ctx, &jobs.CreatedByInfo{
ID: sj.ScheduleID(),
Name: jobs.CreatedByScheduledJobs,
}, txn, cfg.InternalExecutor, p.(*planner).ExecCfg().JobRegistry)
}, txn, p.(*planner).ExecCfg().JobRegistry)

if err != nil {
return err
Expand Down
3 changes: 0 additions & 3 deletions pkg/sql/sqlstats/persistedsqlstats/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,13 @@ go_test(
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb:with-mocks",
"//pkg/scheduledjobs",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlstats",
"//pkg/sql/sqlutil",
Expand Down
36 changes: 4 additions & 32 deletions pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,12 @@ func CreateSQLStatsCompactionScheduleIfNotYetExist(
return compactionSchedule, nil
}

// CreateCompactionJob creates a system.jobs record if there is no other
// SQL Stats compaction job running. This is invoked by the scheduled job
// Executor.
// CreateCompactionJob creates a system.jobs record.
// We do not need to worry about checking if the job already exist;
// at most 1 job semantics are enforced by scheduled jobs system.
func CreateCompactionJob(
ctx context.Context,
createdByInfo *jobs.CreatedByInfo,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
jobRegistry *jobs.Registry,
ctx context.Context, createdByInfo *jobs.CreatedByInfo, txn *kv.Txn, jobRegistry *jobs.Registry,
) (jobspb.JobID, error) {
if err := CheckExistingCompactionJob(ctx, nil /* job */, ie, txn); err != nil {
return jobspb.InvalidJobID, err
}

record := jobs.Record{
Description: "automatic SQL Stats compaction",
Username: security.NodeUserName(),
Expand All @@ -108,26 +100,6 @@ func CreateCompactionJob(
return jobID, nil
}

// CheckExistingCompactionJob checks for existing SQL Stats Compaction job
// that are either PAUSED, CANCELED, or RUNNING. If so, it returns a
// ErrConcurrentSQLStatsCompaction.
func CheckExistingCompactionJob(
ctx context.Context, job *jobs.Job, ie sqlutil.InternalExecutor, txn *kv.Txn,
) error {
jobID := jobspb.InvalidJobID
if job != nil {
jobID = job.ID()
}
exists, err := jobs.RunningJobExists(ctx, jobID, ie, txn, func(payload *jobspb.Payload) bool {
return payload.Type() == jobspb.TypeAutoSQLStatsCompaction
})

if err == nil && exists {
err = ErrConcurrentSQLStatsCompaction
}
return err
}

func checkExistingCompactionSchedule(
ctx context.Context, ie sqlutil.InternalExecutor, txn *kv.Txn,
) (exists bool, _ error) {
Expand Down
111 changes: 0 additions & 111 deletions pkg/sql/sqlstats/persistedsqlstats/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand Down Expand Up @@ -202,76 +197,6 @@ func TestSQLStatsCompactor(t *testing.T) {
}
}

func TestAtMostOneSQLStatsCompactionJob(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var serverArgs base.TestServerArgs
var allowRequest chan struct{}

serverArgs.Knobs.SQLStatsKnobs = &sqlstats.TestingKnobs{
AOSTClause: "AS OF SYSTEM TIME '-1us'",
}

params := base.TestClusterArgs{ServerArgs: serverArgs}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()

allowRequest = make(chan struct{})
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: createStatsRequestFilter(t, allowRequest),
}

ctx := context.Background()
tc := serverutils.StartNewTestCluster(t, 3 /* numNodes */, params)
defer tc.Stopper().Stop(ctx)
conn := tc.ServerConn(0 /* idx */)
sqlDB := sqlutils.MakeSQLRunner(conn)
server := tc.Server(0 /* idx */)

jobID, err := launchSQLStatsCompactionJob(server)
require.NoError(t, err)

// We wait until the job appears in the system.jobs table.
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d`, jobID),
[][]string{{"1"}},
)

allowRequest <- struct{}{}

// Launching a second job should fail here since we are still blocking the
// the first job's execution through allowRequest. (Since we need to send
// struct{}{} twice into the channel to fully unblock it.)
_, err = launchSQLStatsCompactionJob(server)
expected := persistedsqlstats.ErrConcurrentSQLStatsCompaction.Error()
if !testutils.IsError(err, expected) {
t.Fatalf("expected '%s' error, but got %+v", expected, err)
}

allowRequest <- struct{}{}
close(allowRequest)

// We wait until the first job finishes.
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID),
[][]string{{"1"}},
)

// Launching the job now should succeed.
jobID, err = launchSQLStatsCompactionJob(server)
require.NoError(t, err)

// Wait until the second job to finish for sanity check.
server.JobRegistry().(*jobs.Registry).TestingNudgeAdoptionQueue()
sqlDB.CheckQueryResultsRetry(
t,
fmt.Sprintf(`SELECT count(*) FROM system.jobs where id = %d AND status = 'succeeded'`, jobID),
[][]string{{"1"}},
)
}

func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -310,7 +235,6 @@ func TestSQLStatsCompactionJobMarkedAsAutomatic(t *testing.T) {
func launchSQLStatsCompactionJob(server serverutils.TestServerInterface) (jobspb.JobID, error) {
return persistedsqlstats.CreateCompactionJob(
context.Background(), nil /* createdByInfo */, nil, /* txn */
server.InternalExecutor().(sqlutil.InternalExecutor),
server.JobRegistry().(*jobs.Registry),
)
}
Expand Down Expand Up @@ -367,38 +291,3 @@ ORDER BY aggregated_ts`

return stmtFingerprints, txnFingerprints
}

func createStatsRequestFilter(
t *testing.T, allowToProgress chan struct{},
) kvserverbase.ReplicaRequestFilter {
// Start a test server here, so we can get the descriptor ID for the system
// table. This allows us to not hardcode the descriptor ID.
s, sqlConn, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer func() {
s.Stopper().Stop(context.Background())
err := sqlConn.Close()
require.NoError(t, err)
}()
sqlDB := sqlutils.MakeSQLRunner(sqlConn)

stmtStatsTableID, txnStatsTableID := getStatsTablesIDs(t, sqlDB)
return func(_ context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if req, ok := ba.GetArg(roachpb.Scan); ok {
_, tableID, _ := encoding.DecodeUvarintAscending(req.(*roachpb.ScanRequest).Key)
if descpb.ID(tableID) == stmtStatsTableID || descpb.ID(tableID) == txnStatsTableID {
<-allowToProgress
<-allowToProgress
}
}
return nil
}
}

func getStatsTablesIDs(
t *testing.T, sqlDB *sqlutils.SQLRunner,
) (stmtStatsTableID, txnStatsTableID descpb.ID) {
stmt :=
"select 'system.statement_statistics'::regclass::oid, 'system.transaction_statistics'::regclass::oid"
sqlDB.QueryRow(t, stmt).Scan(&stmtStatsTableID, &txnStatsTableID)
return stmtStatsTableID, txnStatsTableID
}
Loading