From 4f7759928b34c2b018cddd8d416bbdbfd8fa86e0 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 19 Aug 2020 11:16:05 -0400 Subject: [PATCH] bulkio: Fix transaction semantics in job scheduler. Use transaction when querying for the schedules to run. In addition, ensure that a single bad schedule does not cause all of the previous work to be wasted by using transaction safepoints. Release Notes: None --- pkg/jobs/job_scheduler.go | 20 +++++++++-- pkg/jobs/job_scheduler_test.go | 61 ++++++++++++++++++++++++++++++++-- 2 files changed, 75 insertions(+), 6 deletions(-) diff --git a/pkg/jobs/job_scheduler.go b/pkg/jobs/job_scheduler.go index 6ecc863c05fe..884fa353868d 100644 --- a/pkg/jobs/job_scheduler.go +++ b/pkg/jobs/job_scheduler.go @@ -236,8 +236,10 @@ func (s *jobScheduler) executeSchedules( defer stats.updateMetrics(&s.metrics) findSchedulesStmt := getFindSchedulesStatement(s.env, maxSchedules) - rows, cols, err := s.InternalExecutor.QueryWithCols(ctx, "find-scheduled-jobs", nil, - sqlbase.InternalExecutorSessionDataOverride{User: security.RootUser}, + rows, cols, err := s.InternalExecutor.QueryWithCols( + ctx, "find-scheduled-jobs", + txn, + sqlbase.InternalExecutorSessionDataOverride{User: security.NodeUser}, findSchedulesStmt) if err != nil { @@ -252,8 +254,20 @@ func (s *jobScheduler) executeSchedules( continue } + sp, err := txn.CreateSavepoint(ctx) + if err != nil { + return err + } + if err := s.processSchedule(ctx, schedule, numRunning, stats, txn); err != nil { - // We don't know if txn is good at this point, so bail out. + log.Errorf(ctx, "error processing schedule %d: %+v", schedule.ScheduleID(), err) + + if err := txn.RollbackToSavepoint(ctx, sp); err != nil { + return errors.Wrapf(err, "failed to rollback savepoint for schedule %d", schedule.ScheduleID()) + } + } + + if err := txn.ReleaseSavepoint(ctx, sp); err != nil { return err } } diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index 9c70620cd051..25ca61df5dd8 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -26,7 +26,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -43,8 +42,6 @@ func TestJobSchedulerReschedulesRunning(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 52959) - h, cleanup := newTestHelper(t) defer cleanup() @@ -425,6 +422,64 @@ func TestJobSchedulerDaemonHonorsMaxJobsLimit(t *testing.T) { stopper.Stop(ctx) } +// returnErrorExecutor counts the number of times it is +// called, and always returns an error. +type returnErrorExecutor struct { + numCalls int +} + +func (e *returnErrorExecutor) ExecuteJob( + _ context.Context, + _ *scheduledjobs.JobExecutionConfig, + _ scheduledjobs.JobSchedulerEnv, + schedule *ScheduledJob, + _ *kv.Txn, +) error { + e.numCalls++ + return errors.Newf("error for schedule %d", schedule.ScheduleID()) +} + +func (e *returnErrorExecutor) NotifyJobTermination( + _ context.Context, _ int64, _ Status, _ *ScheduledJob, _ *kv.Txn, +) error { + return nil +} + +func (e *returnErrorExecutor) Metrics() metric.Struct { + return nil +} + +var _ ScheduledJobExecutor = &returnErrorExecutor{} + +func TestJobSchedulerToleratesBadSchedules(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + h, cleanup := newTestHelper(t) + defer cleanup() + + ctx := context.Background() + + const executorName = "return_error" + ex := &returnErrorExecutor{} + defer registerScopedScheduledJobExecutor(executorName, ex)() + + // Create few one-off schedules. + const numJobs = 5 + scheduleRunTime := h.env.Now().Add(time.Hour) + for i := 0; i < numJobs; i++ { + s := h.newScheduledJobForExecutor("schedule", executorName, nil) + s.SetNextRun(scheduleRunTime) + require.NoError(t, s.Create(ctx, h.cfg.InternalExecutor, nil)) + } + h.env.SetTime(scheduleRunTime.Add(time.Second)) + daemon := newJobScheduler(h.cfg, h.env, metric.NewRegistry()) + require.NoError(t, + h.cfg.DB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + return daemon.executeSchedules(ctx, numJobs, txn) + })) + require.Equal(t, numJobs, ex.numCalls) +} + func TestJobSchedulerDaemonUsesSystemTables(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)