Skip to content

Commit

Permalink
Merge #134829
Browse files Browse the repository at this point in the history
134829: backupccl: altering backup schedule no longer resumes schedule r=msbutler a=kev-cao

Previously, altering a backup schedule's recurrence or collection URI would resume the schedule even if it was paused. As we already have `RESUME SCHEDULE`, this is not necessary and can lead to unexpected surprises for the user. The new behavior is upon altering a paused schedule, the schedules remain paused regardless of if the collection URI or the recurrence is changed.

However, in the event that the collection URI is altered for a pair of active schedules, the incremental will still be paused until a full backup is fully completed.

Fixes: #128995

Epic: None

Release note (backward-incompatible change): Altering a paused backup schedule's recurrence or location no longer resumes the schedule.

Co-authored-by: Kevin Cao <[email protected]>
  • Loading branch information
craig[bot] and kev-cao committed Dec 13, 2024
2 parents ada0ea7 + 771b4f0 commit 5590074
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 34 deletions.
37 changes: 24 additions & 13 deletions pkg/backup/alter_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,13 +399,18 @@ func processRecurrence(
if recurrence == "" {
return nil
}
// Maintain the pause state of the schedule while updating the schedule.
if incJob != nil {
if err := incJob.SetSchedule(recurrence); err != nil {
return err
if incJob.IsPaused() {
incJob.SetScheduleExpr(recurrence)
} else {
return incJob.SetScheduleAndNextRun(recurrence)
}
} else {
if err := fullJob.SetSchedule(recurrence); err != nil {
return err
if fullJob.IsPaused() {
fullJob.SetScheduleExpr(recurrence)
} else {
return fullJob.SetScheduleAndNextRun(recurrence)
}
}
return nil
Expand Down Expand Up @@ -433,9 +438,7 @@ func processFullBackupRecurrence(
}
// Copy the cadence from the incremental to the full, and delete the
// incremental.
if err := s.fullJob.SetSchedule(s.incJob.ScheduleExpr()); err != nil {
return scheduleDetails{}, err
}
s.fullJob.SetScheduleExpr(s.incJob.ScheduleExpr())
s.fullArgs.DependentScheduleID = 0
s.fullArgs.UnpauseOnSuccess = 0
if err := scheduledJobs.Delete(ctx, s.incJob); err != nil {
Expand Down Expand Up @@ -504,8 +507,12 @@ func processFullBackupRecurrence(
}
// We have an incremental backup at this point.
// Make no (further) changes, and just edit the cadence on the full.
if err := s.fullJob.SetSchedule(fullBackupRecurrence); err != nil {
return scheduleDetails{}, err
if s.fullJob.IsPaused() {
s.fullJob.SetScheduleExpr(fullBackupRecurrence)
} else {
if err := s.fullJob.SetScheduleAndNextRun(fullBackupRecurrence); err != nil {
return scheduleDetails{}, err
}
}

fullAny, err := pbtypes.MarshalAny(s.fullArgs)
Expand Down Expand Up @@ -573,14 +580,18 @@ func processInto(p sql.PlanHookState, spec *alterBackupScheduleSpec, s scheduleD

// With a new destination, no full backup has completed yet.
// Pause incrementals until a full backup completes.
incPaused := s.incJob.IsPaused()
s.incJob.Pause()
s.incJob.SetScheduleStatus("Waiting for initial backup to complete")
s.fullArgs.UnpauseOnSuccess = s.incJob.ScheduleID()

// Kick off a full backup immediately so we can unpause incrementals.
// This mirrors the behavior of CREATE SCHEDULE FOR BACKUP.
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
s.fullJob.SetNextRun(env.Now())
// If the inc schedule was not already paused, kick off a full backup immediately
// so we can unpause incrementals. This mirrors the behavior of
// CREATE SCHEDULE FOR BACKUP.
if !incPaused {
env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs())
s.fullJob.SetNextRun(env.Now())
}

return nil
}
Expand Down
86 changes: 86 additions & 0 deletions pkg/backup/alter_backup_schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,89 @@ func TestAlterBackupScheduleSetsIncrementalClusterID(t *testing.T) {
)
require.Len(t, rows, 2)
}

func TestAlterBackupScheduleDoesNotResumePausedSchedules(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newAlterSchedulesTestHelper(t, nil)
defer cleanup()

t.Run("standalone paused full backup is not resumed", func(t *testing.T) {
createCmd := "CREATE SCHEDULE FOR BACKUP INTO 'nodelocal://1/backup/alter-schedule' RECURRING '@daily' FULL BACKUP ALWAYS;"
rows := th.sqlDB.QueryStr(t, createCmd)
require.Len(t, rows, 1)
scheduleID, err := strconv.Atoi(rows[0][0])
require.NoError(t, err)

th.sqlDB.Exec(t, fmt.Sprintf(`PAUSE SCHEDULE %d;`, scheduleID))
alterCmd := fmt.Sprintf(`ALTER BACKUP SCHEDULE %d SET RECURRING '@hourly';`, scheduleID)
th.sqlDB.Exec(t, alterCmd)

status, recurrence := scheduleStatusAndRecurrence(t, th, scheduleID)
require.Equal(t, "PAUSED", status)
require.Equal(t, "@hourly", recurrence)
})

t.Run("paused incremental and full backup pair is not resumed", func(t *testing.T) {
createCmd := "CREATE SCHEDULE FOR BACKUP INTO 'nodelocal://1/backup/alter-schedule' RECURRING '@daily' FULL BACKUP '@daily';"
rows := th.sqlDB.QueryStr(t, createCmd)
require.Len(t, rows, 2)
incID, err := strconv.Atoi(rows[0][0])
require.NoError(t, err)
fullID, err := strconv.Atoi(rows[1][0])
require.NoError(t, err)

th.sqlDB.Exec(t, fmt.Sprintf(`PAUSE SCHEDULE %d;`, incID))
th.sqlDB.Exec(t, fmt.Sprintf(`PAUSE SCHEDULE %d;`, fullID))

alterCmd := fmt.Sprintf("ALTER BACKUP SCHEDULE %d SET RECURRING '*/30 * * * *', SET FULL BACKUP '@hourly';", incID)
th.sqlDB.Exec(t, alterCmd)

incStatus, incRecurrence := scheduleStatusAndRecurrence(t, th, incID)
require.Equal(t, "PAUSED", incStatus)
require.Equal(t, "*/30 * * * *", incRecurrence)
fullStatus, fullRecurrence := scheduleStatusAndRecurrence(t, th, fullID)
require.Equal(t, "PAUSED", fullStatus)
require.Equal(t, "@hourly", fullRecurrence)
})
}

func TestAlterBackupSchedulePausesIncrementalForNewCollection(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newAlterSchedulesTestHelper(t, nil)
defer cleanup()

createCmd := "CREATE SCHEDULE FOR BACKUP INTO 'nodelocal://1/backup/alter-schedule' RECURRING '@hourly' FULL BACKUP '@daily';"
rows := th.sqlDB.QueryStr(t, createCmd)
require.Len(t, rows, 2)
incID, err := strconv.Atoi(rows[0][0])
require.NoError(t, err)
fullID, err := strconv.Atoi(rows[1][0])
require.NoError(t, err)

// Artificially resume inc schedule to test if it gets paused after the alter
th.sqlDB.Exec(t, `RESUME SCHEDULE $1`, incID)

alterCmd := fmt.Sprintf(`ALTER BACKUP SCHEDULE %d SET INTO 'nodelocal://1/backup/alter-schedule-2';`, fullID)
th.sqlDB.Exec(t, alterCmd)

incStatus, incRecurrence := scheduleStatusAndRecurrence(t, th, incID)
require.Equal(t, "PAUSED", incStatus)
require.Equal(t, "@hourly", incRecurrence)
fullStatus, fullRecurrence := scheduleStatusAndRecurrence(t, th, fullID)
require.Equal(t, "ACTIVE", fullStatus)
require.Equal(t, "@daily", fullRecurrence)
}

func scheduleStatusAndRecurrence(
t *testing.T, th *alterSchedulesTestHelper, id int,
) (status string, recurrence string) {
t.Helper()
th.sqlDB.
QueryRow(t, `SELECT schedule_status, recurrence FROM [SHOW SCHEDULES] WHERE id=$1`, id).
Scan(&status, &recurrence)
return status, recurrence
}
2 changes: 1 addition & 1 deletion pkg/backup/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func makeBackupSchedule(
args.BackupType = backuppb.ScheduledBackupExecutionArgs_FULL
}

if err := sj.SetSchedule(recurrence.Cron); err != nil {
if err := sj.SetScheduleAndNextRun(recurrence.Cron); err != nil {
return nil, nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/scheduled_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func makeChangefeedSchedule(
sj.SetScheduleLabel(label)
sj.SetOwner(owner)

if err := sj.SetSchedule(recurrence.Cron); err != nil {
if err := sj.SetScheduleAndNextRun(recurrence.Cron); err != nil {
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/delegate_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestScheduleControl(t *testing.T) {
makeSchedule := func(name string, cron string) jobspb.ScheduleID {
schedule := th.newScheduledJob(t, name, "sql")
if cron != "" {
require.NoError(t, schedule.SetSchedule(cron))
require.NoError(t, schedule.SetScheduleAndNextRun(cron))
}

require.NoError(t, schedules.Create(ctx, schedule))
Expand All @@ -77,7 +77,7 @@ func TestScheduleControl(t *testing.T) {

t.Run("pause-active-schedule", func(t *testing.T) {
schedule := th.newScheduledJob(t, "test schedule", "select 42")
require.NoError(t, schedule.SetSchedule("@weekly"))
require.NoError(t, schedule.SetScheduleAndNextRun("@weekly"))
// Datums only store up until microseconds.
ms := time.Microsecond
firstRunTime := timeutil.Now().Add(10 * time.Second).Truncate(ms)
Expand Down
2 changes: 1 addition & 1 deletion pkg/jobs/executor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestInlineExecutorFailedJobsHandling(t *testing.T) {
j := h.newScheduledJob(t, "test_job", "test sql")
j.rec.ExecutorType = InlineExecutorName

require.NoError(t, j.SetSchedule("@daily"))
require.NoError(t, j.SetScheduleAndNextRun("@daily"))
j.SetScheduleDetails(jobstest.AddDummyScheduleDetails(jobspb.ScheduleDetails{OnError: test.onError}))

ctx := context.Background()
Expand Down
8 changes: 4 additions & 4 deletions pkg/jobs/job_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestJobSchedulerReschedulesRunning(t *testing.T) {
details := j.ScheduleDetails()
details.Wait = wait
j.SetScheduleDetails(*details)
require.NoError(t, j.SetSchedule("@hourly"))
require.NoError(t, j.SetScheduleAndNextRun("@hourly"))

require.NoError(t,
h.cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestJobSchedulerExecutesAfterTerminal(t *testing.T) {
// Create job that waits for the previous runs to finish.
j := h.newScheduledJob(t, "j", "SELECT 42 AS meaning_of_life;")
j.SetScheduleDetails(jobstest.AddDummyScheduleDetails(jobspb.ScheduleDetails{Wait: wait}))
require.NoError(t, j.SetSchedule("@hourly"))
require.NoError(t, j.SetScheduleAndNextRun("@hourly"))

require.NoError(t,
h.cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestJobSchedulerExecutesAndSchedulesNextRun(t *testing.T) {

// Create job that waits for the previous runs to finish.
j := h.newScheduledJob(t, "j", "SELECT 42 AS meaning_of_life;")
require.NoError(t, j.SetSchedule("@hourly"))
require.NoError(t, j.SetScheduleAndNextRun("@hourly"))

require.NoError(t,
h.cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestJobSchedulerRetriesFailed(t *testing.T) {
t.Run(tc.onError.String(), func(t *testing.T) {
h.env.SetTime(startTime)
schedule.SetScheduleDetails(jobstest.AddDummyScheduleDetails(jobspb.ScheduleDetails{OnError: tc.onError}))
require.NoError(t, schedule.SetSchedule("@hourly"))
require.NoError(t, schedule.SetScheduleAndNextRun("@hourly"))
require.NoError(t, schedules.Update(ctx, schedule))

h.env.SetTime(execTime)
Expand Down
10 changes: 8 additions & 2 deletions pkg/jobs/scheduled_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,20 @@ func (j *ScheduledJob) ExecutionArgs() *jobspb.ExecutionArguments {
return &j.rec.ExecutionArgs
}

// SetSchedule updates periodicity of this schedule, and updates this schedules
// SetScheduleAndNextRun updates periodicity of this schedule, and updates this schedules
// next run time.
func (j *ScheduledJob) SetSchedule(scheduleExpr string) error {
func (j *ScheduledJob) SetScheduleAndNextRun(scheduleExpr string) error {
j.rec.ScheduleExpr = scheduleExpr
j.markDirty("schedule_expr")
return j.ScheduleNextRun()
}

// SetScheduleExpr updates schedule expression for this schedule without updating next run time.
func (j *ScheduledJob) SetScheduleExpr(scheduleExpr string) {
j.rec.ScheduleExpr = scheduleExpr
j.markDirty("schedule_expr")
}

// HasRecurringSchedule returns true if this schedule job runs periodically.
func (j *ScheduledJob) HasRecurringSchedule() bool {
return len(j.rec.ScheduleExpr) > 0
Expand Down
8 changes: 4 additions & 4 deletions pkg/jobs/scheduled_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestCreateScheduledJob(t *testing.T) {

schedules := ScheduledJobDB(h.cfg.DB)
j := h.newScheduledJob(t, "test_job", "test sql")
require.NoError(t, j.SetSchedule("@daily"))
require.NoError(t, j.SetScheduleAndNextRun("@daily"))
require.NoError(t, schedules.Create(context.Background(), j))
require.True(t, j.ScheduleID() > 0)
}
Expand All @@ -43,7 +43,7 @@ func TestCreatePausedScheduledJob(t *testing.T) {
defer cleanup()

j := h.newScheduledJob(t, "test_job", "test sql")
require.NoError(t, j.SetSchedule("@daily"))
require.NoError(t, j.SetScheduleAndNextRun("@daily"))
schedules := ScheduledJobDB(h.cfg.DB)
j.Pause()
require.NoError(t, schedules.Create(context.Background(), j))
Expand All @@ -60,7 +60,7 @@ func TestSetsSchedule(t *testing.T) {
j := h.newScheduledJob(t, "test_job", "test sql")

// Set job schedule to run "@daily" -- i.e. at midnight.
require.NoError(t, j.SetSchedule("@daily"))
require.NoError(t, j.SetScheduleAndNextRun("@daily"))

// The job is expected to run at midnight the next day.
// We want to ensure nextRun correctly persisted in the cron table.
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestPauseUnpauseJob(t *testing.T) {
schedules := ScheduledJobDB(h.cfg.DB)
ctx := context.Background()
j := h.newScheduledJob(t, "test_job", "test sql")
require.NoError(t, j.SetSchedule("@daily"))
require.NoError(t, j.SetScheduleAndNextRun("@daily"))
require.NoError(t, schedules.Create(ctx, j))

// Pause and save.
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2114,7 +2114,7 @@ func handleTTLStorageParamChange(
if err != nil {
return false, err
}
if err := s.SetSchedule(after.DeletionCronOrDefault()); err != nil {
if err := s.SetScheduleAndNextRun(after.DeletionCronOrDefault()); err != nil {
return false, err
}
if err := schedules.Update(params.ctx, s); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func updateSchedule(ctx context.Context, db isql.DB, st *cluster.Settings, clust
if sj.ScheduleExpr() == cronExpr {
return nil
}
if err := sj.SetSchedule(cronExpr); err != nil {
if err := sj.SetScheduleAndNextRun(cronExpr); err != nil {
return err
}
sj.SetScheduleStatus(string(jobs.StatusPending))
Expand Down Expand Up @@ -219,7 +219,7 @@ func CreateSchemaTelemetrySchedule(
scheduledJob := jobs.NewScheduledJob(scheduledjobs.ProdJobSchedulerEnv)

schedule := SchemaTelemetryRecurrence.Get(&st.SV)
if err := scheduledJob.SetSchedule(schedule); err != nil {
if err := scheduledJob.SetScheduleAndNextRun(schedule); err != nil {
return nil, err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/create_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2514,7 +2514,7 @@ func newRowLevelTTLScheduledJob(
CreationClusterVersion: clusterVersion,
})

if err := sj.SetSchedule(tblDesc.RowLevelTTL.DeletionCronOrDefault()); err != nil {
if err := sj.SetScheduleAndNextRun(tblDesc.RowLevelTTL.DeletionCronOrDefault()); err != nil {
return nil, err
}
args := &catpb.ScheduledRowLevelTTLArgs{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func CreateSQLStatsCompactionScheduleIfNotYetExist(
schedule := scheduledjobs.MaybeRewriteCronExpr(
clusterID, SQLStatsCleanupRecurrence.Get(&st.SV),
)
if err := compactionSchedule.SetSchedule(schedule); err != nil {
if err := compactionSchedule.SetScheduleAndNextRun(schedule); err != nil {
return nil, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (j *jobMonitor) updateSchedule(ctx context.Context, cronExpr string) {
if sj.ScheduleExpr() == cronExpr {
return nil
}
if err := sj.SetSchedule(cronExpr); err != nil {
if err := sj.SetScheduleAndNextRun(cronExpr); err != nil {
return err
}
sj.SetScheduleStatus(string(jobs.StatusPending))
Expand Down

0 comments on commit 5590074

Please sign in to comment.