Skip to content

Commit

Permalink
jobs: don't reset next_run when resuming active schedules
Browse files Browse the repository at this point in the history
Consider an active schedule that was created with a specific first_run
time. The first_run would populate the next_run_time on the schedule.
The resumption of this schedule before it executed would re-evaluate the
next runtime based off the schedule's recurrence.

This commit changes the scheduling system to only recompute the next run
time on paused schedules.

Release justification: bug fix

Release note (bug fix): Fix a bug where resuming an active schedule
would always reset its next run time. This was sometimes undesirable
with schedules that had a first_run option specified.
  • Loading branch information
pbardea committed Sep 2, 2021
1 parent d7d56e8 commit 3e0bbe1
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
18 changes: 18 additions & 0 deletions pkg/jobs/schedule_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -75,6 +76,23 @@ func TestScheduleControl(t *testing.T) {
require.True(t, th.loadSchedule(t, scheduleID).IsPaused())
})

t.Run("pause-active-schedule", func(t *testing.T) {
schedule := th.newScheduledJob(t, "test schedule", "select 42")
require.NoError(t, schedule.SetSchedule("@weekly"))
// Datums only store up until microseconds.
ms := time.Microsecond
firstRunTime := timeutil.Now().Add(10 * time.Second).Truncate(ms)
schedule.SetNextRun(firstRunTime)
require.NoError(t, schedule.Create(ctx, th.cfg.InternalExecutor, nil))
scheduleID := schedule.ScheduleID()
require.Equal(t, schedule.NextRun(), firstRunTime)
th.sqlDB.Exec(t, "RESUME SCHEDULE $1", scheduleID)

afterSchedule := th.loadSchedule(t, scheduleID)
require.False(t, afterSchedule.IsPaused())
require.Equal(t, afterSchedule.NextRun(), firstRunTime)
})

t.Run("cannot-resume-one-off-schedule", func(t *testing.T) {
schedule := th.newScheduledJob(t, "test schedule", "select 42")
require.NoError(t, schedule.Create(ctx, th.cfg.InternalExecutor, nil))
Expand Down
12 changes: 8 additions & 4 deletions pkg/sql/control_schedules.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func loadSchedule(params runParams, scheduleID tree.Datum) (*jobs.ScheduledJob,
"load-schedule",
params.EvalContext().Txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},
fmt.Sprintf(
"SELECT schedule_id, schedule_expr, executor_type, execution_args FROM %s WHERE schedule_id = $1",
"SELECT schedule_id, next_run, schedule_expr, executor_type, execution_args FROM %s WHERE schedule_id = $1",
env.ScheduledJobsTableName(),
),
scheduleID)
Expand Down Expand Up @@ -138,9 +138,13 @@ func (n *controlSchedulesNode) startExec(params runParams) error {
schedule.Pause()
err = updateSchedule(params, schedule)
case tree.ResumeSchedule:
err = schedule.ScheduleNextRun()
if err == nil {
err = updateSchedule(params, schedule)
// Only schedule the next run time on PAUSED schedules, since ACTIVE schedules may
// have a custom next run time set by first_run.
if schedule.IsPaused() {
err = schedule.ScheduleNextRun()
if err == nil {
err = updateSchedule(params, schedule)
}
}
case tree.DropSchedule:
var ex jobs.ScheduledJobExecutor
Expand Down

0 comments on commit 3e0bbe1

Please sign in to comment.