From e6e3287dee56800469b29f62e03438fccfc64393 Mon Sep 17 00:00:00 2001 From: Kevin Cao Date: Wed, 6 Nov 2024 22:09:52 -0500 Subject: [PATCH 1/2] backupccl: altering backup schedule no longer resumes schedule Previously, altering a backup schedule's recurrence or collection URI would resume the schedule even if it was paused. As we already have `RESUME SCHEDULE`, this is not necessary and can lead to unexpected surprises for the user. The new behavior is upon altering a paused schedule, the schedules remain paused regardless of if the collection URI or the recurrence is changed. However, in the event that the collection URI is altered for a pair of active schedules, the incremental will still be paused until a full backup is fully completed. Fixes: #128995 Epic: None Release note (backward-incompatible change): Altering a paused backup schedule's recurrence or location no longer resumes the schedule. --- pkg/backup/alter_backup_schedule.go | 37 +++++++----- pkg/backup/alter_backup_schedule_test.go | 57 +++++++++++++++++++ pkg/backup/create_scheduled_backup.go | 2 +- pkg/ccl/changefeedccl/scheduled_changefeed.go | 2 +- pkg/jobs/delegate_control_test.go | 4 +- pkg/jobs/executor_impl_test.go | 2 +- pkg/jobs/job_scheduler_test.go | 8 +-- pkg/jobs/scheduled_job.go | 10 +++- pkg/jobs/scheduled_job_test.go | 8 +-- pkg/sql/alter_table.go | 2 +- .../schematelemetrycontroller/controller.go | 4 +- pkg/sql/create_table.go | 2 +- .../compaction_scheduling.go | 2 +- .../scheduled_job_monitor.go | 2 +- 14 files changed, 108 insertions(+), 34 deletions(-) diff --git a/pkg/backup/alter_backup_schedule.go b/pkg/backup/alter_backup_schedule.go index 3fb151bf7265..e053f771525e 100644 --- a/pkg/backup/alter_backup_schedule.go +++ b/pkg/backup/alter_backup_schedule.go @@ -399,13 +399,18 @@ func processRecurrence( if recurrence == "" { return nil } + // Maintain the pause state of the schedule while updating the schedule. if incJob != nil { - if err := incJob.SetSchedule(recurrence); err != nil { - return err + if incJob.IsPaused() { + incJob.SetScheduleExpr(recurrence) + } else { + return incJob.SetScheduleAndNextRun(recurrence) } } else { - if err := fullJob.SetSchedule(recurrence); err != nil { - return err + if fullJob.IsPaused() { + fullJob.SetScheduleExpr(recurrence) + } else { + return fullJob.SetScheduleAndNextRun(recurrence) } } return nil @@ -433,9 +438,7 @@ func processFullBackupRecurrence( } // Copy the cadence from the incremental to the full, and delete the // incremental. - if err := s.fullJob.SetSchedule(s.incJob.ScheduleExpr()); err != nil { - return scheduleDetails{}, err - } + s.fullJob.SetScheduleExpr(s.incJob.ScheduleExpr()) s.fullArgs.DependentScheduleID = 0 s.fullArgs.UnpauseOnSuccess = 0 if err := scheduledJobs.Delete(ctx, s.incJob); err != nil { @@ -504,8 +507,12 @@ func processFullBackupRecurrence( } // We have an incremental backup at this point. // Make no (further) changes, and just edit the cadence on the full. - if err := s.fullJob.SetSchedule(fullBackupRecurrence); err != nil { - return scheduleDetails{}, err + if s.fullJob.IsPaused() { + s.fullJob.SetScheduleExpr(fullBackupRecurrence) + } else { + if err := s.fullJob.SetScheduleAndNextRun(fullBackupRecurrence); err != nil { + return scheduleDetails{}, err + } } fullAny, err := pbtypes.MarshalAny(s.fullArgs) @@ -573,14 +580,18 @@ func processInto(p sql.PlanHookState, spec *alterBackupScheduleSpec, s scheduleD // With a new destination, no full backup has completed yet. // Pause incrementals until a full backup completes. + incPaused := s.incJob.IsPaused() s.incJob.Pause() s.incJob.SetScheduleStatus("Waiting for initial backup to complete") s.fullArgs.UnpauseOnSuccess = s.incJob.ScheduleID() - // Kick off a full backup immediately so we can unpause incrementals. - // This mirrors the behavior of CREATE SCHEDULE FOR BACKUP. - env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs()) - s.fullJob.SetNextRun(env.Now()) + // If the inc schedule was not already paused, kick off a full backup immediately + // so we can unpause incrementals. This mirrors the behavior of + // CREATE SCHEDULE FOR BACKUP. + if !incPaused { + env := sql.JobSchedulerEnv(p.ExecCfg().JobsKnobs()) + s.fullJob.SetNextRun(env.Now()) + } return nil } diff --git a/pkg/backup/alter_backup_schedule_test.go b/pkg/backup/alter_backup_schedule_test.go index 51bf09d4400c..2458fe7da32d 100644 --- a/pkg/backup/alter_backup_schedule_test.go +++ b/pkg/backup/alter_backup_schedule_test.go @@ -194,3 +194,60 @@ func TestAlterBackupScheduleSetsIncrementalClusterID(t *testing.T) { ) require.Len(t, rows, 2) } + +func TestAlterBackupScheduleDoesNotResumePausedSchedules(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + th, cleanup := newAlterSchedulesTestHelper(t, nil) + defer cleanup() + + t.Run("standalone paused full backup is not resumed", func(t *testing.T) { + createCmd := "CREATE SCHEDULE FOR BACKUP INTO 'nodelocal://1/backup/alter-schedule' RECURRING '@daily' FULL BACKUP ALWAYS;" + rows := th.sqlDB.QueryStr(t, createCmd) + require.Len(t, rows, 1) + scheduleID, err := strconv.Atoi(rows[0][0]) + require.NoError(t, err) + + th.sqlDB.Exec(t, fmt.Sprintf(`PAUSE SCHEDULE %d;`, scheduleID)) + alterCmd := fmt.Sprintf(`ALTER BACKUP SCHEDULE %d SET RECURRING '@hourly';`, scheduleID) + th.sqlDB.Exec(t, alterCmd) + + status, recurrence := scheduleStatusAndRecurrence(t, th, scheduleID) + require.Equal(t, "PAUSED", status) + require.Equal(t, "@hourly", recurrence) + }) + + t.Run("paused incremental and full backup pair is not resumed", func(t *testing.T) { + createCmd := "CREATE SCHEDULE FOR BACKUP INTO 'nodelocal://1/backup/alter-schedule' RECURRING '@daily' FULL BACKUP '@daily';" + rows := th.sqlDB.QueryStr(t, createCmd) + require.Len(t, rows, 2) + incID, err := strconv.Atoi(rows[0][0]) + require.NoError(t, err) + fullID, err := strconv.Atoi(rows[1][0]) + require.NoError(t, err) + + th.sqlDB.Exec(t, fmt.Sprintf(`PAUSE SCHEDULE %d;`, incID)) + th.sqlDB.Exec(t, fmt.Sprintf(`PAUSE SCHEDULE %d;`, fullID)) + + alterCmd := fmt.Sprintf("ALTER BACKUP SCHEDULE %d SET RECURRING '*/30 * * * *', SET FULL BACKUP '@hourly';", incID) + th.sqlDB.Exec(t, alterCmd) + + incStatus, incRecurrence := scheduleStatusAndRecurrence(t, th, incID) + require.Equal(t, "PAUSED", incStatus) + require.Equal(t, "*/30 * * * *", incRecurrence) + fullStatus, fullRecurrence := scheduleStatusAndRecurrence(t, th, fullID) + require.Equal(t, "PAUSED", fullStatus) + require.Equal(t, "@hourly", fullRecurrence) + }) +} + +func scheduleStatusAndRecurrence( + t *testing.T, th *alterSchedulesTestHelper, id int, +) (status string, recurrence string) { + t.Helper() + th.sqlDB. + QueryRow(t, `SELECT schedule_status, recurrence FROM [SHOW SCHEDULES] WHERE id=$1`, id). + Scan(&status, &recurrence) + return status, recurrence +} diff --git a/pkg/backup/create_scheduled_backup.go b/pkg/backup/create_scheduled_backup.go index f07749dc83d8..c57c62f06c52 100644 --- a/pkg/backup/create_scheduled_backup.go +++ b/pkg/backup/create_scheduled_backup.go @@ -492,7 +492,7 @@ func makeBackupSchedule( args.BackupType = backuppb.ScheduledBackupExecutionArgs_FULL } - if err := sj.SetSchedule(recurrence.Cron); err != nil { + if err := sj.SetScheduleAndNextRun(recurrence.Cron); err != nil { return nil, nil, err } diff --git a/pkg/ccl/changefeedccl/scheduled_changefeed.go b/pkg/ccl/changefeedccl/scheduled_changefeed.go index a0e0a5725115..ef978e7af618 100644 --- a/pkg/ccl/changefeedccl/scheduled_changefeed.go +++ b/pkg/ccl/changefeedccl/scheduled_changefeed.go @@ -383,7 +383,7 @@ func makeChangefeedSchedule( sj.SetScheduleLabel(label) sj.SetOwner(owner) - if err := sj.SetSchedule(recurrence.Cron); err != nil { + if err := sj.SetScheduleAndNextRun(recurrence.Cron); err != nil { return nil, err } diff --git a/pkg/jobs/delegate_control_test.go b/pkg/jobs/delegate_control_test.go index 98938e892146..fa3dc387c48e 100644 --- a/pkg/jobs/delegate_control_test.go +++ b/pkg/jobs/delegate_control_test.go @@ -56,7 +56,7 @@ func TestScheduleControl(t *testing.T) { makeSchedule := func(name string, cron string) jobspb.ScheduleID { schedule := th.newScheduledJob(t, name, "sql") if cron != "" { - require.NoError(t, schedule.SetSchedule(cron)) + require.NoError(t, schedule.SetScheduleAndNextRun(cron)) } require.NoError(t, schedules.Create(ctx, schedule)) @@ -77,7 +77,7 @@ func TestScheduleControl(t *testing.T) { t.Run("pause-active-schedule", func(t *testing.T) { schedule := th.newScheduledJob(t, "test schedule", "select 42") - require.NoError(t, schedule.SetSchedule("@weekly")) + require.NoError(t, schedule.SetScheduleAndNextRun("@weekly")) // Datums only store up until microseconds. ms := time.Microsecond firstRunTime := timeutil.Now().Add(10 * time.Second).Truncate(ms) diff --git a/pkg/jobs/executor_impl_test.go b/pkg/jobs/executor_impl_test.go index b4e114839a9f..21e3fc805c76 100644 --- a/pkg/jobs/executor_impl_test.go +++ b/pkg/jobs/executor_impl_test.go @@ -53,7 +53,7 @@ func TestInlineExecutorFailedJobsHandling(t *testing.T) { j := h.newScheduledJob(t, "test_job", "test sql") j.rec.ExecutorType = InlineExecutorName - require.NoError(t, j.SetSchedule("@daily")) + require.NoError(t, j.SetScheduleAndNextRun("@daily")) j.SetScheduleDetails(jobstest.AddDummyScheduleDetails(jobspb.ScheduleDetails{OnError: test.onError})) ctx := context.Background() diff --git a/pkg/jobs/job_scheduler_test.go b/pkg/jobs/job_scheduler_test.go index e165089ab3a6..36a004877313 100644 --- a/pkg/jobs/job_scheduler_test.go +++ b/pkg/jobs/job_scheduler_test.go @@ -63,7 +63,7 @@ func TestJobSchedulerReschedulesRunning(t *testing.T) { details := j.ScheduleDetails() details.Wait = wait j.SetScheduleDetails(*details) - require.NoError(t, j.SetSchedule("@hourly")) + require.NoError(t, j.SetScheduleAndNextRun("@hourly")) require.NoError(t, h.cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { @@ -120,7 +120,7 @@ func TestJobSchedulerExecutesAfterTerminal(t *testing.T) { // Create job that waits for the previous runs to finish. j := h.newScheduledJob(t, "j", "SELECT 42 AS meaning_of_life;") j.SetScheduleDetails(jobstest.AddDummyScheduleDetails(jobspb.ScheduleDetails{Wait: wait})) - require.NoError(t, j.SetSchedule("@hourly")) + require.NoError(t, j.SetScheduleAndNextRun("@hourly")) require.NoError(t, h.cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { @@ -164,7 +164,7 @@ func TestJobSchedulerExecutesAndSchedulesNextRun(t *testing.T) { // Create job that waits for the previous runs to finish. j := h.newScheduledJob(t, "j", "SELECT 42 AS meaning_of_life;") - require.NoError(t, j.SetSchedule("@hourly")) + require.NoError(t, j.SetScheduleAndNextRun("@hourly")) require.NoError(t, h.cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { @@ -558,7 +558,7 @@ func TestJobSchedulerRetriesFailed(t *testing.T) { t.Run(tc.onError.String(), func(t *testing.T) { h.env.SetTime(startTime) schedule.SetScheduleDetails(jobstest.AddDummyScheduleDetails(jobspb.ScheduleDetails{OnError: tc.onError})) - require.NoError(t, schedule.SetSchedule("@hourly")) + require.NoError(t, schedule.SetScheduleAndNextRun("@hourly")) require.NoError(t, schedules.Update(ctx, schedule)) h.env.SetTime(execTime) diff --git a/pkg/jobs/scheduled_job.go b/pkg/jobs/scheduled_job.go index 276833516da6..3262b3c18bbc 100644 --- a/pkg/jobs/scheduled_job.go +++ b/pkg/jobs/scheduled_job.go @@ -171,14 +171,20 @@ func (j *ScheduledJob) ExecutionArgs() *jobspb.ExecutionArguments { return &j.rec.ExecutionArgs } -// SetSchedule updates periodicity of this schedule, and updates this schedules +// SetScheduleAndNextRun updates periodicity of this schedule, and updates this schedules // next run time. -func (j *ScheduledJob) SetSchedule(scheduleExpr string) error { +func (j *ScheduledJob) SetScheduleAndNextRun(scheduleExpr string) error { j.rec.ScheduleExpr = scheduleExpr j.markDirty("schedule_expr") return j.ScheduleNextRun() } +// SetScheduleExpr updates schedule expression for this schedule without updating next run time. +func (j *ScheduledJob) SetScheduleExpr(scheduleExpr string) { + j.rec.ScheduleExpr = scheduleExpr + j.markDirty("schedule_expr") +} + // HasRecurringSchedule returns true if this schedule job runs periodically. func (j *ScheduledJob) HasRecurringSchedule() bool { return len(j.rec.ScheduleExpr) > 0 diff --git a/pkg/jobs/scheduled_job_test.go b/pkg/jobs/scheduled_job_test.go index 8dd974eebf14..6b4608a14719 100644 --- a/pkg/jobs/scheduled_job_test.go +++ b/pkg/jobs/scheduled_job_test.go @@ -31,7 +31,7 @@ func TestCreateScheduledJob(t *testing.T) { schedules := ScheduledJobDB(h.cfg.DB) j := h.newScheduledJob(t, "test_job", "test sql") - require.NoError(t, j.SetSchedule("@daily")) + require.NoError(t, j.SetScheduleAndNextRun("@daily")) require.NoError(t, schedules.Create(context.Background(), j)) require.True(t, j.ScheduleID() > 0) } @@ -43,7 +43,7 @@ func TestCreatePausedScheduledJob(t *testing.T) { defer cleanup() j := h.newScheduledJob(t, "test_job", "test sql") - require.NoError(t, j.SetSchedule("@daily")) + require.NoError(t, j.SetScheduleAndNextRun("@daily")) schedules := ScheduledJobDB(h.cfg.DB) j.Pause() require.NoError(t, schedules.Create(context.Background(), j)) @@ -60,7 +60,7 @@ func TestSetsSchedule(t *testing.T) { j := h.newScheduledJob(t, "test_job", "test sql") // Set job schedule to run "@daily" -- i.e. at midnight. - require.NoError(t, j.SetSchedule("@daily")) + require.NoError(t, j.SetScheduleAndNextRun("@daily")) // The job is expected to run at midnight the next day. // We want to ensure nextRun correctly persisted in the cron table. @@ -101,7 +101,7 @@ func TestPauseUnpauseJob(t *testing.T) { schedules := ScheduledJobDB(h.cfg.DB) ctx := context.Background() j := h.newScheduledJob(t, "test_job", "test sql") - require.NoError(t, j.SetSchedule("@daily")) + require.NoError(t, j.SetScheduleAndNextRun("@daily")) require.NoError(t, schedules.Create(ctx, j)) // Pause and save. diff --git a/pkg/sql/alter_table.go b/pkg/sql/alter_table.go index 1f08af115651..ba08531d76e9 100644 --- a/pkg/sql/alter_table.go +++ b/pkg/sql/alter_table.go @@ -2114,7 +2114,7 @@ func handleTTLStorageParamChange( if err != nil { return false, err } - if err := s.SetSchedule(after.DeletionCronOrDefault()); err != nil { + if err := s.SetScheduleAndNextRun(after.DeletionCronOrDefault()); err != nil { return false, err } if err := schedules.Update(params.ctx, s); err != nil { diff --git a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go index cd5519dcc1c6..ce3444ba438c 100644 --- a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go +++ b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go @@ -159,7 +159,7 @@ func updateSchedule(ctx context.Context, db isql.DB, st *cluster.Settings, clust if sj.ScheduleExpr() == cronExpr { return nil } - if err := sj.SetSchedule(cronExpr); err != nil { + if err := sj.SetScheduleAndNextRun(cronExpr); err != nil { return err } sj.SetScheduleStatus(string(jobs.StatusPending)) @@ -219,7 +219,7 @@ func CreateSchemaTelemetrySchedule( scheduledJob := jobs.NewScheduledJob(scheduledjobs.ProdJobSchedulerEnv) schedule := SchemaTelemetryRecurrence.Get(&st.SV) - if err := scheduledJob.SetSchedule(schedule); err != nil { + if err := scheduledJob.SetScheduleAndNextRun(schedule); err != nil { return nil, err } diff --git a/pkg/sql/create_table.go b/pkg/sql/create_table.go index c9a46bbd5da7..9c74d05b44c8 100644 --- a/pkg/sql/create_table.go +++ b/pkg/sql/create_table.go @@ -2514,7 +2514,7 @@ func newRowLevelTTLScheduledJob( CreationClusterVersion: clusterVersion, }) - if err := sj.SetSchedule(tblDesc.RowLevelTTL.DeletionCronOrDefault()); err != nil { + if err := sj.SetScheduleAndNextRun(tblDesc.RowLevelTTL.DeletionCronOrDefault()); err != nil { return nil, err } args := &catpb.ScheduledRowLevelTTLArgs{ diff --git a/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go b/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go index f7cfff7bd4db..19c6ad3e81c3 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go +++ b/pkg/sql/sqlstats/persistedsqlstats/compaction_scheduling.go @@ -47,7 +47,7 @@ func CreateSQLStatsCompactionScheduleIfNotYetExist( schedule := scheduledjobs.MaybeRewriteCronExpr( clusterID, SQLStatsCleanupRecurrence.Get(&st.SV), ) - if err := compactionSchedule.SetSchedule(schedule); err != nil { + if err := compactionSchedule.SetScheduleAndNextRun(schedule); err != nil { return nil, err } diff --git a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go index 310802fae898..f62f68a65cf4 100644 --- a/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go +++ b/pkg/sql/sqlstats/persistedsqlstats/scheduled_job_monitor.go @@ -178,7 +178,7 @@ func (j *jobMonitor) updateSchedule(ctx context.Context, cronExpr string) { if sj.ScheduleExpr() == cronExpr { return nil } - if err := sj.SetSchedule(cronExpr); err != nil { + if err := sj.SetScheduleAndNextRun(cronExpr); err != nil { return err } sj.SetScheduleStatus(string(jobs.StatusPending)) From 771b4f04f63d4a3e441772cdabbc4a5d4ccf5529 Mon Sep 17 00:00:00 2001 From: Kevin Cao Date: Thu, 12 Dec 2024 12:36:15 -0500 Subject: [PATCH 2/2] backupccl: add test for alter backup schedule into new URI We previously were missing test coverage for ensuring that upon altering the collection URI for *active* schedules, the incremental schedule is paused, waiting until the full backup completes. Epic: none Release note: none --- pkg/backup/alter_backup_schedule_test.go | 29 ++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/pkg/backup/alter_backup_schedule_test.go b/pkg/backup/alter_backup_schedule_test.go index 2458fe7da32d..75a8a4981efb 100644 --- a/pkg/backup/alter_backup_schedule_test.go +++ b/pkg/backup/alter_backup_schedule_test.go @@ -242,6 +242,35 @@ func TestAlterBackupScheduleDoesNotResumePausedSchedules(t *testing.T) { }) } +func TestAlterBackupSchedulePausesIncrementalForNewCollection(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + th, cleanup := newAlterSchedulesTestHelper(t, nil) + defer cleanup() + + createCmd := "CREATE SCHEDULE FOR BACKUP INTO 'nodelocal://1/backup/alter-schedule' RECURRING '@hourly' FULL BACKUP '@daily';" + rows := th.sqlDB.QueryStr(t, createCmd) + require.Len(t, rows, 2) + incID, err := strconv.Atoi(rows[0][0]) + require.NoError(t, err) + fullID, err := strconv.Atoi(rows[1][0]) + require.NoError(t, err) + + // Artificially resume inc schedule to test if it gets paused after the alter + th.sqlDB.Exec(t, `RESUME SCHEDULE $1`, incID) + + alterCmd := fmt.Sprintf(`ALTER BACKUP SCHEDULE %d SET INTO 'nodelocal://1/backup/alter-schedule-2';`, fullID) + th.sqlDB.Exec(t, alterCmd) + + incStatus, incRecurrence := scheduleStatusAndRecurrence(t, th, incID) + require.Equal(t, "PAUSED", incStatus) + require.Equal(t, "@hourly", incRecurrence) + fullStatus, fullRecurrence := scheduleStatusAndRecurrence(t, th, fullID) + require.Equal(t, "ACTIVE", fullStatus) + require.Equal(t, "@daily", fullRecurrence) +} + func scheduleStatusAndRecurrence( t *testing.T, th *alterSchedulesTestHelper, id int, ) (status string, recurrence string) {